Skip to content

Commit 62a76bb

Browse files
committed
test: Fixed unsupported data structures
1 parent ac7caac commit 62a76bb

File tree

2 files changed

+33
-32
lines changed

2 files changed

+33
-32
lines changed

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/DataStructureConverter.java

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public class DataStructureConverter implements Function<DataStructure<String>, S
3434
public static final Schema HASH_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional();
3535
public static final Schema LIST_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).optional();
3636
public static final Schema SET_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).optional();
37-
public static final Schema STREAM_SCHEMA = SchemaBuilder.array(StreamMessageConverter.VALUE_SCHEMA).optional();
3837
public static final Schema STRING_SCHEMA = Schema.OPTIONAL_STRING_SCHEMA;
3938
public static final Schema ZSET_SCHEMA = SchemaBuilder.map(Schema.FLOAT64_SCHEMA, Schema.STRING_SCHEMA).optional();
4039
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD_KEY, Schema.STRING_SCHEMA)
@@ -48,20 +47,39 @@ public Struct apply(DataStructure<String> input) {
4847
struct.put(FIELD_KEY, input.getKey());
4948
struct.put(FIELD_TTL, input.getTtl());
5049
struct.put(FIELD_TYPE, input.getType());
51-
struct.put(fieldName(input), fieldValue(input));
52-
return struct;
53-
}
54-
55-
@SuppressWarnings("unchecked")
56-
private Object fieldValue(DataStructure<String> input) {
5750
switch (input.getType()) {
58-
case DataStructure.ZSET:
59-
return zsetMap((Collection<ScoredValue<String>>) input.getValue());
51+
case DataStructure.HASH:
52+
struct.put(FIELD_HASH, input.getValue());
53+
break;
54+
case DataStructure.JSON:
55+
struct.put(FIELD_JSON, input.getValue());
56+
break;
57+
case DataStructure.LIST:
58+
struct.put(FIELD_LIST, input.getValue());
59+
break;
6060
case DataStructure.SET:
61-
return new ArrayList<>(((Collection<String>) input.getValue()));
61+
struct.put(FIELD_SET, list(input));
62+
break;
63+
case DataStructure.STRING:
64+
struct.put(FIELD_STRING, input.getValue());
65+
break;
66+
case DataStructure.ZSET:
67+
struct.put(FIELD_ZSET, zsetMap(input));
68+
break;
6269
default:
63-
return input.getValue();
70+
break;
6471
}
72+
return struct;
73+
}
74+
75+
@SuppressWarnings("unchecked")
76+
private Object list(DataStructure<String> input) {
77+
return new ArrayList<>((Collection<String>) input.getValue());
78+
}
79+
80+
public static Map<Double, String> zsetMap(DataStructure<String> input) {
81+
Collection<ScoredValue<String>> value = input.getValue();
82+
return zsetMap(value);
6583
}
6684

6785
public static Map<Double, String> zsetMap(Collection<ScoredValue<String>> value) {
@@ -72,23 +90,4 @@ public static Map<Long, Double> timeseriesMap(Collection<Sample> samples) {
7290
return samples.stream().collect(Collectors.toMap(Sample::getTimestamp, Sample::getValue));
7391
}
7492

75-
public static String fieldName(DataStructure<String> input) {
76-
switch (input.getType()) {
77-
case DataStructure.HASH:
78-
return FIELD_HASH;
79-
case DataStructure.JSON:
80-
return FIELD_JSON;
81-
case DataStructure.LIST:
82-
return FIELD_LIST;
83-
case DataStructure.SET:
84-
return FIELD_SET;
85-
case DataStructure.STRING:
86-
return FIELD_STRING;
87-
case DataStructure.ZSET:
88-
return FIELD_ZSET;
89-
default:
90-
return FIELD_STRING;
91-
}
92-
}
93-
9493
}

core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSourceIntegrationTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,9 @@ void pollKeys() throws Exception {
444444
for (SourceRecord record : sourceRecords) {
445445
Assertions.assertEquals(topic, record.topic());
446446
Compare compare = values((Struct) record.value());
447-
Assertions.assertEquals(compare.expected, compare.actual);
447+
if (compare != null) {
448+
Assertions.assertEquals(compare.expected, compare.actual);
449+
}
448450
}
449451

450452
// DataStructure<String> stringDS = new DataStructure<>();
@@ -491,7 +493,7 @@ private Compare values(Struct struct) {
491493
return compare(DataStructureConverter.zsetMap(commands.zrangeWithScores(key, 0, -1)),
492494
struct.getMap(DataStructureConverter.FIELD_ZSET));
493495
default:
494-
throw new RuntimeException("Unknown type: " + type);
496+
return null;
495497
}
496498
}
497499

0 commit comments

Comments
 (0)