Skip to content

Commit 5a5042a

Browse files
author
jruaux
committed
test: Fixed RedisJSON errors
1 parent 70f3ef2 commit 5a5042a

17 files changed

+1591
-1530
lines changed

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/RedisKeysSourceConnector.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515
import org.apache.kafka.connect.connector.Task;
1616

1717
import com.redis.kafka.connect.source.AbstractRedisSourceConnector;
18+
import com.redis.kafka.connect.source.RedisKeysSourceConfig;
1819
import com.redis.kafka.connect.source.RedisKeysSourceConfigDef;
1920
import com.redis.kafka.connect.source.RedisKeysSourceTask;
2021

2122
public class RedisKeysSourceConnector extends AbstractRedisSourceConnector {
2223

23-
@Override
24-
public Class<? extends Task> taskClass() {
25-
return RedisKeysSourceTask.class;
26-
}
24+
@Override
25+
public Class<? extends Task> taskClass() {
26+
return RedisKeysSourceTask.class;
27+
}
28+
29+
@Override
30+
public RedisKeysSourceConfigDef config() {
31+
return RedisKeysSourceConfig.CONFIG;
32+
}
2733

28-
@Override
29-
public RedisKeysSourceConfigDef config() {
30-
return new RedisKeysSourceConfigDef();
31-
}
3234
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/RedisSinkConnector.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,49 +15,51 @@
1515
*/
1616
package com.redis.kafka.connect;
1717

18-
import java.util.Collections;
1918
import java.util.List;
2019
import java.util.Map;
20+
import java.util.stream.Collectors;
21+
import java.util.stream.IntStream;
2122

2223
import org.apache.kafka.common.config.ConfigDef;
2324
import org.apache.kafka.connect.connector.Task;
2425
import org.apache.kafka.connect.sink.SinkConnector;
2526

2627
import com.redis.kafka.connect.common.ManifestVersionProvider;
27-
import com.redis.kafka.connect.sink.RedisSinkConfigDef;
28+
import com.redis.kafka.connect.sink.RedisSinkConfig;
2829
import com.redis.kafka.connect.sink.RedisSinkTask;
2930

3031
public class RedisSinkConnector extends SinkConnector {
3132

32-
private Map<String, String> props;
33-
34-
@Override
35-
public void start(Map<String, String> props) {
36-
this.props = props;
37-
}
38-
39-
@Override
40-
public Class<? extends Task> taskClass() {
41-
return RedisSinkTask.class;
42-
}
43-
44-
@Override
45-
public List<Map<String, String>> taskConfigs(int maxTasks) {
46-
return Collections.singletonList(props);
47-
}
48-
49-
@Override
50-
public void stop() {
51-
// Do nothing
52-
}
53-
54-
@Override
55-
public ConfigDef config() {
56-
return new RedisSinkConfigDef();
57-
}
58-
59-
@Override
60-
public String version() {
61-
return ManifestVersionProvider.getVersion();
62-
}
33+
private Map<String, String> props;
34+
35+
@Override
36+
public void start(Map<String, String> props) {
37+
this.props = props;
38+
}
39+
40+
@Override
41+
public Class<? extends Task> taskClass() {
42+
return RedisSinkTask.class;
43+
}
44+
45+
@Override
46+
public List<Map<String, String>> taskConfigs(int maxTasks) {
47+
return IntStream.range(0, maxTasks).mapToObj(i -> props).collect(Collectors.toList());
48+
}
49+
50+
@Override
51+
public void stop() {
52+
// Do nothing
53+
}
54+
55+
@Override
56+
public ConfigDef config() {
57+
return RedisSinkConfig.CONFIG;
58+
}
59+
60+
@Override
61+
public String version() {
62+
return ManifestVersionProvider.getVersion();
63+
}
64+
6365
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/RedisStreamSourceConnector.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@
1515
import org.apache.kafka.connect.connector.Task;
1616

1717
import com.redis.kafka.connect.source.AbstractRedisSourceConnector;
18+
import com.redis.kafka.connect.source.RedisStreamSourceConfig;
1819
import com.redis.kafka.connect.source.RedisStreamSourceConfigDef;
1920
import com.redis.kafka.connect.source.RedisStreamSourceTask;
2021

2122
public class RedisStreamSourceConnector extends AbstractRedisSourceConnector {
2223

23-
@Override
24-
public Class<? extends Task> taskClass() {
25-
return RedisStreamSourceTask.class;
26-
}
24+
@Override
25+
public Class<? extends Task> taskClass() {
26+
return RedisStreamSourceTask.class;
27+
}
28+
29+
@Override
30+
public RedisStreamSourceConfigDef config() {
31+
return RedisStreamSourceConfig.CONFIG;
32+
}
2733

28-
@Override
29-
public RedisStreamSourceConfigDef config() {
30-
return new RedisStreamSourceConfigDef();
31-
}
3234
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java

Lines changed: 81 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -24,79 +24,86 @@
2424

2525
public class RedisSinkConfig extends RedisConfig {
2626

27-
public enum RedisCommand {
28-
HSET, JSONSET, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL
29-
}
30-
31-
private final Charset charset;
32-
private final RedisCommand command;
33-
private final String keyspace;
34-
private final String separator;
35-
private final boolean multiexec;
36-
private final int waitReplicas;
37-
private final Duration waitTimeout;
38-
39-
public RedisSinkConfig(Map<?, ?> originals) {
40-
super(new RedisSinkConfigDef(), originals);
41-
String charsetName = getString(RedisSinkConfigDef.CHARSET_CONFIG).trim();
42-
charset = Charset.forName(charsetName);
43-
command = RedisCommand.valueOf(getString(RedisSinkConfigDef.COMMAND_CONFIG));
44-
keyspace = getString(RedisSinkConfigDef.KEY_CONFIG).trim();
45-
separator = getString(RedisSinkConfigDef.SEPARATOR_CONFIG).trim();
46-
multiexec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG));
47-
waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG);
48-
waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG));
49-
}
50-
51-
public Charset getCharset() {
52-
return charset;
53-
}
54-
55-
public RedisCommand getCommand() {
56-
return command;
57-
}
58-
59-
public String getKeyspace() {
60-
return keyspace;
61-
}
62-
63-
public String getSeparator() {
64-
return separator;
65-
}
66-
67-
public boolean isMultiexec() {
68-
return multiexec;
69-
}
70-
71-
public int getWaitReplicas() {
72-
return waitReplicas;
73-
}
74-
75-
public Duration getWaitTimeout() {
76-
return waitTimeout;
77-
}
78-
79-
@Override
80-
public int hashCode() {
81-
final int prime = 31;
82-
int result = super.hashCode();
83-
result = prime * result
84-
+ Objects.hash(charset, keyspace, separator, multiexec, command, waitReplicas, waitTimeout);
85-
return result;
86-
}
87-
88-
@Override
89-
public boolean equals(Object obj) {
90-
if (this == obj)
91-
return true;
92-
if (!super.equals(obj))
93-
return false;
94-
if (getClass() != obj.getClass())
95-
return false;
96-
RedisSinkConfig other = (RedisSinkConfig) obj;
97-
return Objects.equals(charset, other.charset) && Objects.equals(keyspace, other.keyspace)
98-
&& Objects.equals(separator, other.separator) && multiexec == other.multiexec
99-
&& command == other.command && waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout;
100-
}
27+
public enum RedisCommand {
28+
HSET, JSONSET, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL
29+
}
30+
31+
public static final RedisSinkConfigDef CONFIG = new RedisSinkConfigDef();
32+
33+
private final Charset charset;
34+
35+
private final RedisCommand command;
36+
37+
private final String keyspace;
38+
39+
private final String separator;
40+
41+
private final boolean multiexec;
42+
43+
private final int waitReplicas;
44+
45+
private final Duration waitTimeout;
46+
47+
public RedisSinkConfig(Map<?, ?> originals) {
48+
super(new RedisSinkConfigDef(), originals);
49+
String charsetName = getString(RedisSinkConfigDef.CHARSET_CONFIG).trim();
50+
charset = Charset.forName(charsetName);
51+
command = RedisCommand.valueOf(getString(RedisSinkConfigDef.COMMAND_CONFIG));
52+
keyspace = getString(RedisSinkConfigDef.KEY_CONFIG).trim();
53+
separator = getString(RedisSinkConfigDef.SEPARATOR_CONFIG).trim();
54+
multiexec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG));
55+
waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG);
56+
waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG));
57+
}
58+
59+
public Charset getCharset() {
60+
return charset;
61+
}
62+
63+
public RedisCommand getCommand() {
64+
return command;
65+
}
66+
67+
public String getKeyspace() {
68+
return keyspace;
69+
}
70+
71+
public String getSeparator() {
72+
return separator;
73+
}
74+
75+
public boolean isMultiexec() {
76+
return multiexec;
77+
}
78+
79+
public int getWaitReplicas() {
80+
return waitReplicas;
81+
}
82+
83+
public Duration getWaitTimeout() {
84+
return waitTimeout;
85+
}
86+
87+
@Override
88+
public int hashCode() {
89+
final int prime = 31;
90+
int result = super.hashCode();
91+
result = prime * result + Objects.hash(charset, keyspace, separator, multiexec, command, waitReplicas, waitTimeout);
92+
return result;
93+
}
94+
95+
@Override
96+
public boolean equals(Object obj) {
97+
if (this == obj)
98+
return true;
99+
if (!super.equals(obj))
100+
return false;
101+
if (getClass() != obj.getClass())
102+
return false;
103+
RedisSinkConfig other = (RedisSinkConfig) obj;
104+
return Objects.equals(charset, other.charset) && Objects.equals(keyspace, other.keyspace)
105+
&& Objects.equals(separator, other.separator) && multiexec == other.multiexec && command == other.command
106+
&& waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout;
107+
}
101108

102109
}

0 commit comments

Comments
 (0)