Skip to content

Commit 82d10dc

Browse files
author
Julien Ruaux
committed
deps: Upgraded Spring Batch Redis
1 parent cb89bf5 commit 82d10dc

17 files changed

+520
-457
lines changed

src/main/java/com/redis/kafka/connect/RedisSinkConnector.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,49 +15,49 @@
1515
*/
1616
package com.redis.kafka.connect;
1717

18-
import com.redis.kafka.connect.sink.RedisSinkConfig;
19-
import com.redis.kafka.connect.sink.RedisSinkTask;
20-
import org.apache.kafka.common.config.ConfigDef;
21-
import org.apache.kafka.common.utils.AppInfoParser;
22-
import org.apache.kafka.connect.connector.Task;
23-
import org.apache.kafka.connect.sink.SinkConnector;
24-
2518
import java.util.Collections;
2619
import java.util.List;
2720
import java.util.Map;
2821

29-
public class RedisSinkConnector extends SinkConnector {
30-
31-
private Map<String, String> props;
32-
33-
@Override
34-
public void start(Map<String, String> props) {
35-
this.props = props;
36-
}
37-
38-
@Override
39-
public Class<? extends Task> taskClass() {
40-
return RedisSinkTask.class;
41-
}
42-
43-
@Override
44-
public List<Map<String, String>> taskConfigs(int maxTasks) {
45-
return Collections.singletonList(props);
46-
}
47-
48-
@Override
49-
public void stop() {
50-
// Do nothing
51-
}
22+
import org.apache.kafka.common.config.ConfigDef;
23+
import org.apache.kafka.connect.connector.Task;
24+
import org.apache.kafka.connect.sink.SinkConnector;
5225

53-
@Override
54-
public ConfigDef config() {
55-
return new RedisSinkConfig.RedisSinkConfigDef();
56-
}
26+
import com.redis.kafka.connect.common.VersionProvider;
27+
import com.redis.kafka.connect.sink.RedisSinkConfig;
28+
import com.redis.kafka.connect.sink.RedisSinkTask;
5729

30+
public class RedisSinkConnector extends SinkConnector {
5831

59-
@Override
60-
public String version() {
61-
return AppInfoParser.getVersion();
62-
}
32+
private Map<String, String> props;
33+
34+
@Override
35+
public void start(Map<String, String> props) {
36+
this.props = props;
37+
}
38+
39+
@Override
40+
public Class<? extends Task> taskClass() {
41+
return RedisSinkTask.class;
42+
}
43+
44+
@Override
45+
public List<Map<String, String>> taskConfigs(int maxTasks) {
46+
return Collections.singletonList(props);
47+
}
48+
49+
@Override
50+
public void stop() {
51+
// Do nothing
52+
}
53+
54+
@Override
55+
public ConfigDef config() {
56+
return new RedisSinkConfig.RedisSinkConfigDef();
57+
}
58+
59+
@Override
60+
public String version() {
61+
return VersionProvider.getVersion();
62+
}
6363
}

src/main/java/com/redis/kafka/connect/RedisSourceConnector.java

Lines changed: 58 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,79 +12,80 @@
1212
*/
1313
package com.redis.kafka.connect;
1414

15-
import com.redis.kafka.connect.source.RedisSourceConfig;
16-
import com.redis.kafka.connect.source.RedisSourceTask;
15+
import java.util.ArrayList;
16+
import java.util.HashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.stream.Collectors;
20+
1721
import org.apache.kafka.common.config.ConfigDef;
1822
import org.apache.kafka.common.config.ConfigException;
19-
import org.apache.kafka.common.utils.AppInfoParser;
2023
import org.apache.kafka.connect.connector.Task;
2124
import org.apache.kafka.connect.errors.ConnectException;
2225
import org.apache.kafka.connect.source.SourceConnector;
2326
import org.apache.kafka.connect.util.ConnectorUtils;
2427

25-
import java.util.ArrayList;
26-
import java.util.HashMap;
27-
import java.util.List;
28-
import java.util.Map;
29-
import java.util.stream.Collectors;
28+
import com.redis.kafka.connect.common.VersionProvider;
29+
import com.redis.kafka.connect.source.RedisSourceConfig;
30+
import com.redis.kafka.connect.source.RedisSourceTask;
3031

3132
public class RedisSourceConnector extends SourceConnector {
3233

33-
private Map<String, String> props;
34-
private RedisSourceConfig config;
34+
private Map<String, String> props;
35+
private RedisSourceConfig config;
3536

36-
@Override
37-
public void start(Map<String, String> props) {
38-
this.props = props;
39-
try {
40-
this.config = new RedisSourceConfig(props);
41-
} catch (ConfigException configException) {
42-
throw new ConnectException(configException);
43-
}
44-
}
37+
@Override
38+
public void start(Map<String, String> props) {
39+
this.props = props;
40+
try {
41+
this.config = new RedisSourceConfig(props);
42+
} catch (ConfigException configException) {
43+
throw new ConnectException(configException);
44+
}
45+
}
4546

46-
@Override
47-
public Class<? extends Task> taskClass() {
48-
return RedisSourceTask.class;
49-
}
47+
@Override
48+
public Class<? extends Task> taskClass() {
49+
return RedisSourceTask.class;
50+
}
5051

51-
@Override
52-
public List<Map<String, String>> taskConfigs(int maxTasks) {
53-
if (this.config.getReaderType() == RedisSourceConfig.ReaderType.KEYS) {
54-
// Partition the configs based on channels
55-
final List<List<String>> partitionedPatterns = ConnectorUtils
56-
.groupPartitions(this.config.getKeyPatterns(), Math.min(this.config.getKeyPatterns().size(), maxTasks));
52+
@Override
53+
public List<Map<String, String>> taskConfigs(int maxTasks) {
54+
if (this.config.getReaderType() == RedisSourceConfig.ReaderType.KEYS) {
55+
// Partition the configs based on channels
56+
final List<List<String>> partitionedPatterns = ConnectorUtils.groupPartitions(this.config.getKeyPatterns(),
57+
Math.min(this.config.getKeyPatterns().size(), maxTasks));
5758

58-
// Create task configs based on the partitions
59-
return partitionedPatterns.stream().map(this::taskConfig).collect(Collectors.toList());
60-
}
61-
List<Map<String, String>> taskConfigs = new ArrayList<>();
62-
for (int i = 0; i < maxTasks; i++) {
63-
Map<String, String> taskConfig = new HashMap<>(this.props);
64-
taskConfig.put(RedisSourceTask.TASK_ID, Integer.toString(i));
65-
taskConfigs.add(taskConfig);
66-
}
67-
return taskConfigs;
68-
}
59+
// Create task configs based on the partitions
60+
return partitionedPatterns.stream().map(this::taskConfig).collect(Collectors.toList());
61+
}
62+
List<Map<String, String>> taskConfigs = new ArrayList<>();
63+
for (int i = 0; i < maxTasks; i++) {
64+
Map<String, String> taskConfig = new HashMap<>(this.props);
65+
taskConfig.put(RedisSourceTask.TASK_ID, Integer.toString(i));
66+
taskConfigs.add(taskConfig);
67+
}
68+
return taskConfigs;
69+
}
6970

70-
private Map<String, String> taskConfig(List<String> patterns) {
71-
final Map<String, String> taskConfig = new HashMap<>(this.config.originalsStrings());
72-
taskConfig.put(RedisSourceConfig.KEY_PATTERNS_CONFIG, String.join(",", patterns));
73-
return taskConfig;
74-
}
71+
private Map<String, String> taskConfig(List<String> patterns) {
72+
final Map<String, String> taskConfig = new HashMap<>(this.config.originalsStrings());
73+
taskConfig.put(RedisSourceConfig.KEY_PATTERNS_CONFIG, String.join(",", patterns));
74+
return taskConfig;
75+
}
7576

76-
@Override
77-
public void stop() {
78-
// Do nothing
79-
}
77+
@Override
78+
public void stop() {
79+
// Do nothing
80+
}
8081

81-
@Override
82-
public ConfigDef config() {
83-
return new RedisSourceConfig.RedisSourceConfigDef();
84-
}
82+
@Override
83+
public ConfigDef config() {
84+
return new RedisSourceConfig.RedisSourceConfigDef();
85+
}
8586

86-
@Override
87-
public String version() {
88-
return AppInfoParser.getVersion();
89-
}
87+
@Override
88+
public String version() {
89+
return VersionProvider.getVersion();
90+
}
9091
}

src/main/java/com/redis/kafka/connect/common/RedisConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.redis.lettucemod.util.ClientBuilder;
3232
import com.redis.lettucemod.util.RedisURIBuilder;
3333
import com.redis.spring.batch.common.ConnectionPoolBuilder;
34+
import com.redis.spring.batch.common.PoolOptions;
3435

3536
import io.lettuce.core.AbstractRedisClient;
3637
import io.lettuce.core.RedisURI;
@@ -66,7 +67,7 @@ public class RedisConfig extends AbstractConfig {
6667
private static final String TIMEOUT_DOC = "Redis command timeout in seconds";
6768

6869
public static final String POOL_MAX_CONFIG = "redis.pool";
69-
private static final int POOL_MAX_DEFAULT = ConnectionPoolBuilder.DEFAULT_MAX_TOTAL;
70+
private static final int POOL_MAX_DEFAULT = PoolOptions.DEFAULT_MAX_TOTAL;
7071
private static final String POOL_MAX_DOC = "Max pool connections";
7172

7273
public static final String TLS_CONFIG = "redis.tls";
@@ -147,7 +148,7 @@ public RedisURI uri() {
147148
builder.host(hostAndPort.getHost());
148149
builder.port(hostAndPort.getPort());
149150
} else {
150-
builder.uriString(uri);
151+
builder.uri(uri);
151152
}
152153
if (Boolean.TRUE.equals(getBoolean(INSECURE_CONFIG))) {
153154
builder.sslVerifyMode(SslVerifyMode.NONE);
@@ -196,8 +197,11 @@ public GenericObjectPool<StatefulConnection<String, String>> pool(AbstractRedisC
196197
}
197198

198199
public <K, V> GenericObjectPool<StatefulConnection<K, V>> pool(AbstractRedisClient client, RedisCodec<K, V> codec) {
199-
return ConnectionPoolBuilder.create(client).maxTotal(getInt(POOL_MAX_CONFIG)).build(codec);
200+
return ConnectionPoolBuilder.client(client).options(poolOptions()).codec(codec);
201+
}
200202

203+
protected PoolOptions poolOptions() {
204+
return PoolOptions.builder().maxTotal(getInt(POOL_MAX_CONFIG)).build();
201205
}
202206

203207
}

src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public boolean equals(Object obj) {
224224
}
225225

226226
public WriterOptions writerOptions() {
227-
return WriterOptions.builder().multiExec(isMultiexec()).waitForReplication(waitForReplication()).build();
227+
return WriterOptions.builder().poolOptions(poolOptions()).multiExec(isMultiexec()).waitForReplication(waitForReplication()).build();
228228
}
229229

230230
private Optional<WaitForReplication> waitForReplication() {

src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import com.google.common.base.Preconditions;
4848
import com.google.common.base.Strings;
4949
import com.redis.kafka.connect.RedisSinkConnector;
50+
import com.redis.lettucemod.RedisModulesClient;
51+
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
5052
import com.redis.lettucemod.util.RedisModulesUtils;
5153
import com.redis.spring.batch.RedisItemWriter;
5254
import com.redis.spring.batch.convert.SampleConverter;
@@ -93,7 +95,7 @@ public void start(final Map<String, String> props) {
9395
pool = config.pool(client, ByteArrayCodec.INSTANCE);
9496
jsonConverter = new JsonConverter();
9597
jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), false);
96-
writer = RedisItemWriter.operation(pool, operation()).options(config.writerOptions()).build();
98+
writer = writer(client).options(config.writerOptions()).operation(operation());
9799
writer.open(new ExecutionContext());
98100
final java.util.Set<TopicPartition> assignment = this.context.assignment();
99101
if (!assignment.isEmpty()) {
@@ -109,6 +111,13 @@ public void start(final Map<String, String> props) {
109111
}
110112
}
111113

114+
private RedisItemWriter.Builder<byte[], byte[]> writer(AbstractRedisClient client) {
115+
if (client instanceof RedisModulesClusterClient) {
116+
return RedisItemWriter.client((RedisModulesClusterClient) client, ByteArrayCodec.INSTANCE);
117+
}
118+
return RedisItemWriter.client((RedisModulesClient)client, ByteArrayCodec.INSTANCE);
119+
}
120+
112121
private Collection<SinkOffsetState> offsetStates(java.util.Set<TopicPartition> assignment) {
113122
Collection<SinkOffsetState> offsetStates = new ArrayList<>();
114123
String[] partitionKeys = assignment.stream().map(a -> offsetKey(a.topic(), a.partition()))

src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
import org.springframework.batch.item.ExecutionContext;
1515
import org.springframework.util.Assert;
1616

17+
import com.redis.lettucemod.RedisModulesClient;
18+
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
1719
import com.redis.lettucemod.util.RedisModulesUtils;
1820
import com.redis.spring.batch.RedisItemReader;
1921
import com.redis.spring.batch.common.DataStructure;
2022
import com.redis.spring.batch.common.DataStructure.Type;
23+
import com.redis.spring.batch.common.FlushingOptions;
2124
import com.redis.spring.batch.common.JobRunner;
22-
import com.redis.spring.batch.reader.LiveReaderOptions;
2325
import com.redis.spring.batch.reader.LiveRedisItemReader;
26+
import com.redis.spring.batch.reader.ReaderOptions;
2427

2528
import io.lettuce.core.AbstractRedisClient;
2629
import io.lettuce.core.RedisURI;
@@ -39,19 +42,21 @@ public class KeySourceRecordReader implements SourceRecordReader {
3942

4043
private final Clock clock = Clock.systemDefaultZone();
4144
protected final RedisSourceConfig config;
42-
private final LiveReaderOptions options;
45+
private final ReaderOptions options;
46+
private final FlushingOptions flushingOptions;
4347
private final int batchSize;
4448
private final String topic;
4549
private LiveRedisItemReader<String, DataStructure<String>> reader;
4650
private AbstractRedisClient client;
4751
private GenericObjectPool<StatefulConnection<String, String>> pool;
4852
private StatefulRedisPubSubConnection<String, String> pubSubConnection;
4953

50-
public KeySourceRecordReader(RedisSourceConfig config, LiveReaderOptions options) {
54+
public KeySourceRecordReader(RedisSourceConfig config, ReaderOptions options, FlushingOptions flushingOptions) {
5155
Assert.notNull(config, "Source connector config must not be null");
5256
Assert.notNull(options, "Options must not be null");
5357
this.config = config;
5458
this.options = options;
59+
this.flushingOptions = flushingOptions;
5560
this.topic = config.getTopicName();
5661
this.batchSize = Math.toIntExact(config.getBatchSize());
5762
}
@@ -63,11 +68,19 @@ public void open(Map<String, Object> offset) {
6368
this.pool = config.pool(client);
6469
this.pubSubConnection = RedisModulesUtils.pubSubConnection(client);
6570
checkJobRunner();
66-
reader = RedisItemReader.liveDataStructure(pool, jobRunner, pubSubConnection, uri.getDatabase(),
67-
config.getKeyPatterns().toArray(new String[0])).options(options).build();
71+
reader = reader(client).live().database(uri.getDatabase())
72+
.keyPatterns(config.getKeyPatterns().toArray(new String[0])).jobRunner(jobRunner).readerOptions(options)
73+
.flushingOptions(flushingOptions).dataStructure();
6874
reader.open(new ExecutionContext());
6975
}
7076

77+
private RedisItemReader.Builder<String, String> reader(AbstractRedisClient client) {
78+
if (client instanceof RedisModulesClusterClient) {
79+
return RedisItemReader.client((RedisModulesClusterClient) client);
80+
}
81+
return RedisItemReader.client((RedisModulesClient) client);
82+
}
83+
7184
private static void checkJobRunner() {
7285
if (jobRunner == null) {
7386
try {

0 commit comments

Comments
 (0)