Skip to content

Commit 289f15b

Browse files
committed
feat(source): Added mode config for LIVE or LIVEONLY key reader
1 parent ad8e07d commit 289f15b

File tree

4 files changed

+488
-450
lines changed

4 files changed

+488
-450
lines changed

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisKeysSourceConfig.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.redis.kafka.connect.source;
1717

18+
import com.redis.spring.batch.item.redis.RedisItemReader;
19+
1820
import java.time.Duration;
1921
import java.util.Map;
2022
import java.util.Objects;
@@ -27,13 +29,16 @@ public class RedisKeysSourceConfig extends RedisSourceConfig {
2729

2830
private final String topicName;
2931

30-
private Duration idleTimeout;
32+
private final Duration idleTimeout;
33+
34+
private final RedisItemReader.ReaderMode mode;
3135

3236
public RedisKeysSourceConfig(Map<?, ?> originals) {
3337
super(new RedisKeysSourceConfigDef(), originals);
3438
this.topicName = getString(RedisKeysSourceConfigDef.TOPIC_CONFIG);
3539
this.keyPattern = getString(RedisKeysSourceConfigDef.KEY_PATTERN_CONFIG);
3640
this.idleTimeout = Duration.ofMillis(getLong(RedisKeysSourceConfigDef.IDLE_TIMEOUT_CONFIG));
41+
this.mode = RedisItemReader.ReaderMode.valueOf(getString(RedisKeysSourceConfigDef.MODE_CONFIG).toUpperCase());
3742
}
3843

3944
public String getKeyPattern() {
@@ -48,6 +53,10 @@ public Duration getIdleTimeout() {
4853
return idleTimeout;
4954
}
5055

56+
public RedisItemReader.ReaderMode getMode() {
57+
return mode;
58+
}
59+
5160
@Override
5261
public int hashCode() {
5362
final int prime = 31;

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisKeysSourceConfigDef.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.redis.kafka.connect.source;
22

3+
import com.redis.spring.batch.item.redis.RedisItemReader;
4+
35
public class RedisKeysSourceConfigDef extends RedisSourceConfigDef {
46

57
public static final String TOPIC_CONFIG = "topic";
@@ -14,6 +16,10 @@ public class RedisKeysSourceConfigDef extends RedisSourceConfigDef {
1416
public static final String IDLE_TIMEOUT_DEFAULT = "0";
1517
public static final String IDLE_TIMEOUT_DOC = "Idle timeout in millis. Use 0 to disable.";
1618

19+
public static final String MODE_CONFIG = "mode";
20+
public static final RedisItemReader.ReaderMode MODE_DEFAULT = RedisItemReader.ReaderMode.LIVE;
21+
public static final String MODE_DOC = "Key reading mode. Use LIVE for snapshot + updates, LIVEONLY for just updates.";
22+
1723
public RedisKeysSourceConfigDef() {
1824
define();
1925
}
@@ -27,6 +33,7 @@ private void define() {
2733
define(TOPIC_CONFIG, Type.STRING, TOPIC_DEFAULT, Importance.MEDIUM, TOPIC_DOC);
2834
define(KEY_PATTERN_CONFIG, Type.STRING, KEY_PATTERN_DEFAULT, Importance.MEDIUM, KEY_PATTERN_DOC);
2935
define(IDLE_TIMEOUT_CONFIG, Type.LONG, IDLE_TIMEOUT_DEFAULT, Importance.LOW, IDLE_TIMEOUT_DOC);
36+
define(MODE_CONFIG, Type.STRING, MODE_DEFAULT.name(), Importance.MEDIUM, MODE_DOC);
3037
}
3138

32-
}
39+
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisKeysSourceTask.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,10 @@
1515
*/
1616
package com.redis.kafka.connect.source;
1717

18-
import java.time.Clock;
19-
import java.util.ArrayList;
20-
import java.util.HashMap;
21-
import java.util.List;
22-
import java.util.Map;
23-
import java.util.UUID;
24-
18+
import com.redis.kafka.connect.common.ManifestVersionProvider;
19+
import com.redis.spring.batch.item.redis.RedisItemReader;
20+
import com.redis.spring.batch.item.redis.common.KeyValue;
21+
import io.lettuce.core.AbstractRedisClient;
2522
import org.apache.kafka.connect.data.Schema;
2623
import org.apache.kafka.connect.errors.ConnectException;
2724
import org.apache.kafka.connect.errors.RetriableException;
@@ -30,12 +27,8 @@
3027
import org.springframework.batch.item.ExecutionContext;
3128
import org.springframework.batch.item.ItemStreamException;
3229

33-
import com.redis.kafka.connect.common.ManifestVersionProvider;
34-
import com.redis.spring.batch.item.redis.RedisItemReader;
35-
import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode;
36-
import com.redis.spring.batch.item.redis.common.KeyValue;
37-
38-
import io.lettuce.core.AbstractRedisClient;
30+
import java.time.Clock;
31+
import java.util.*;
3932

4033
public class RedisKeysSourceTask extends SourceTask {
4134

@@ -76,7 +69,7 @@ public void start(Map<String, String> props) {
7669
// Use a random job name to not interfere with other key source tasks
7770
reader.setName(UUID.randomUUID().toString());
7871
reader.setClient(client);
79-
reader.setMode(ReaderMode.LIVE);
72+
reader.setMode(config.getMode());
8073
reader.setPoolSize(config.getPoolSize());
8174
reader.setDatabase(config.uri().getDatabase());
8275
reader.setKeyPattern(config.getKeyPattern());

0 commit comments

Comments
 (0)