diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..ec3f466
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,138 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-connector-redis
+ 1.16-SNAPSHOT
+ jar
+ Flink : Connectors : Redis
+
+
+
+ The Apache Software License, Version 2.0
+ https://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ UTF-8
+ UTF-8
+
+ 1.16-SNAPSHOT
+ 15.0
+ 1.8
+ 1.7.32
+ ${target.java.version}
+ ${target.java.version}
+
+ 2.12.7
+ 2.12
+ 3.8.0
+ 1.7.32
+ 1.16.2
+ 4.13.2
+ 1.3
+
+
+
+
+
+
+
+ redis.clients
+ jedis
+ ${jedis.version}
+ compile
+
+
+ org.slf4j
+ slf4j.api
+
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+
+ junit
+ junit
+ ${junit.version}
+ jar
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ %regex[.*ITCase.*]
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java
new file mode 100644
index 0000000..3b5240e
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntry;
+import redis.clients.jedis.StreamEntryID;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * @param
+ */
+public abstract class AbstractRedisStreamConsumer extends RedisConsumerBase {
+
+ protected final Map streamEntryIds;
+
+ public AbstractRedisStreamConsumer(
+ StartupMode startupMode, List streamKeys, Properties configProps) {
+ super(streamKeys, configProps);
+ final StreamEntryID streamEntryID;
+ switch (startupMode) {
+ case EARLIEST:
+ streamEntryID = new StreamEntryID();
+ break;
+ case LATEST:
+ streamEntryID = StreamEntryID.LAST_ENTRY;
+ break;
+ case GROUP_OFFSETS:
+ streamEntryID = StreamEntryID.UNRECEIVED_ENTRY;
+ break;
+ case SPECIFIC_OFFSETS:
+ throw new RuntimeException(
+ "Use the constructor with 'StreamEntryID[] streamIds' as param");
+ case TIMESTAMP:
+ throw new RuntimeException("Use the constructor with 'Long[] timestamps' param");
+ default:
+ throw new IllegalStateException();
+ }
+ this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID);
+ }
+
+ public AbstractRedisStreamConsumer(
+ List streamKeys, List streamIds, Properties configProps) {
+ this(prepareStreamEntryIds(streamKeys, streamIds), configProps);
+ }
+
+ private AbstractRedisStreamConsumer(
+ Map streamIds, Properties configProps) {
+ super(null, configProps);
+ this.streamEntryIds = streamIds;
+ }
+
+ @Override
+ protected final boolean readAndCollect(
+ Jedis jedis, List streamKeys, SourceContext sourceContext) {
+ boolean anyEntry = false;
+ List>> response = read(jedis);
+ if (response != null) {
+ for (Entry> streamEntries : response) {
+ String streamKey = streamEntries.getKey();
+ for (StreamEntry entry : streamEntries.getValue()) {
+ anyEntry = true;
+ collect(sourceContext, streamKey, entry);
+ updateIdForKey(streamKey, entry.getID());
+ }
+ }
+ }
+ return anyEntry;
+ }
+
+ protected abstract List>> read(Jedis jedis);
+
+ protected abstract void collect(
+ SourceContext sourceContext, String streamKey, StreamEntry streamEntry);
+
+ protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) {
+ if (this.streamEntryIds.get(streamKey).toString().equals(">")) {
+ // skip
+ } else {
+ this.streamEntryIds.put(streamKey, streamEntryID);
+ }
+ }
+
+ private static Map prepareStreamEntryIds(
+ List streamKeys, StreamEntryID streamId) {
+ Map streams = new LinkedHashMap<>(streamKeys.size());
+ streamKeys.forEach(streamKey -> streams.put(streamKey, streamId));
+ return streams;
+ }
+
+ private static Map prepareStreamEntryIds(
+ List streamKeys, List streamIds) {
+ Map streams = new LinkedHashMap<>(streamKeys.size());
+ for (int i = 0; i < streamKeys.size(); i++) {
+ streams.put(streamKeys.get(i), streamIds.get(i));
+ }
+ return streams;
+ }
+
+ public static List convertToStreamEntryIDs(List timestamps) {
+ return timestamps.stream()
+ .map(ts -> new StreamEntryID(ts, 0L))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java b/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java
new file mode 100644
index 0000000..106f1a3
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/** @param */
+public interface DataConverter extends Serializable {
+
+ OUT toData(Map input);
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java
new file mode 100644
index 0000000..0fa3ff2
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.redis.util.JedisUtils;
+
+import redis.clients.jedis.Jedis;
+
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** @param */
+public abstract class FlinkRedisProducerBase extends RichSinkFunction
+ implements CheckpointedFunction {
+
+ private final Properties configProps;
+
+ private transient Jedis jedis;
+
+ private final String key;
+
+ public FlinkRedisProducerBase(String key, Properties configProps) {
+ checkNotNull(key, "key can not be null");
+ this.key = key;
+
+ checkNotNull(configProps, "configProps can not be null");
+ this.configProps = configProps;
+ }
+
+ /** Initializes the connection to Redis. */
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ super.open(configuration);
+ // code
+ this.jedis = JedisUtils.createResource(this.configProps);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ // code
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ @Override
+ public void invoke(OUT value, Context context) throws Exception {
+ invoke(this.jedis, this.key, value, context);
+ }
+
+ protected abstract void invoke(Jedis resource, String key, OUT value, Context context)
+ throws Exception;
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) {
+ // in synchronous mode, nothing to do
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+ // nothing to do
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java
new file mode 100644
index 0000000..72c654c
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntryID;
+
+import java.util.Map;
+import java.util.Properties;
+
+/** @param */
+public class FlinkRedisStreamProducer extends FlinkRedisProducerBase {
+
+ private final MapConverter mapConverter;
+
+ public FlinkRedisStreamProducer(
+ MapConverter mapConverter, String streamKey, Properties configProps) {
+ super(streamKey, configProps);
+ this.mapConverter = mapConverter;
+ }
+
+ @Override
+ public void invoke(Jedis jedis, String streamKey, OUT value, Context context) throws Exception {
+ // code
+ Map map = mapConverter.toMap(value);
+ jedis.xadd(streamKey, StreamEntryID.NEW_ENTRY, map);
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java b/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java
new file mode 100644
index 0000000..7b141a5
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/** @param */
+public interface MapConverter extends Serializable {
+
+ Map toMap(IN input);
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java
new file mode 100644
index 0000000..ab87a7b
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.redis.util.JedisUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** @param */
+public abstract class RedisConsumerBase extends RichParallelSourceFunction {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(RedisConsumerBase.class);
+
+ private final Properties configProps;
+
+ private transient Jedis jedis;
+
+ private final List keys;
+
+ /** Flag indicating whether the consumer is still running. */
+ private volatile boolean running = true;
+
+ public RedisConsumerBase(List keys, Properties configProps) {
+ checkNotNull(keys, "keys can not be null");
+ checkArgument(keys.size() != 0, "must be consuming at least 1 key");
+ this.keys = Collections.unmodifiableList(keys);
+
+ this.configProps = checkNotNull(configProps, "configProps can not be null");
+
+ if (LOG.isInfoEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ for (String key : keys) {
+ sb.append(key).append(", ");
+ }
+ LOG.info(
+ "Flink Redis Stream Consumer is going to read the following streams: {}",
+ sb.toString());
+ }
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ // code
+ super.open(configuration);
+ }
+
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ if (!running) {
+ return;
+ }
+ this.jedis = JedisUtils.createResource(this.configProps);
+
+ while (running) {
+ running = readAndCollect(this.jedis, this.keys, sourceContext);
+ }
+ }
+
+ protected abstract boolean readAndCollect(
+ Jedis jedis, List keys, SourceContext sourceContext);
+
+ @Override
+ public void cancel() {
+ // set ourselves as not running;
+ // this would let the main discovery loop escape as soon as possible
+ running = false;
+
+ // abort the fetcher, if there is one
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ cancel();
+
+ super.close();
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java
new file mode 100644
index 0000000..f1ae6ab
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntry;
+import redis.clients.jedis.StreamEntryID;
+import redis.clients.jedis.params.XReadParams;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * @param
+ */
+public class RedisStreamConsumer extends AbstractRedisStreamConsumer {
+
+ private final DataConverter dataConverter;
+
+ public RedisStreamConsumer(
+ Properties configProps,
+ StartupMode startupMode,
+ DataConverter dataConverter,
+ String streamKey) {
+ this(configProps, startupMode, dataConverter, Arrays.asList(streamKey));
+ }
+
+ public RedisStreamConsumer(
+ Properties configProps,
+ StartupMode startupMode,
+ DataConverter dataConverter,
+ List streamKeys) {
+ super(startupMode, streamKeys, configProps);
+ this.dataConverter = dataConverter;
+ }
+
+ public RedisStreamConsumer(
+ DataConverter dataConverter,
+ List streamKeys,
+ List streamIds,
+ Properties configProps) {
+ super(streamKeys, streamIds, configProps);
+ this.dataConverter = dataConverter;
+ }
+
+ @Override
+ protected List>> read(Jedis jedis) {
+ // return jedis.xread(XReadParams.xReadParams().count(1).block(0), streamEntryIds);
+ return jedis.xread(XReadParams.xReadParams().count(1), streamEntryIds);
+ }
+
+ @Override
+ protected void collect(
+ SourceContext sourceContext, String streamKey, StreamEntry streamEntry) {
+ sourceContext.collect(dataConverter.toData(streamEntry.getFields()));
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java
new file mode 100644
index 0000000..8f35b77
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntry;
+import redis.clients.jedis.StreamEntryID;
+import redis.clients.jedis.params.XReadGroupParams;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * @param
+ */
+public class RedisStreamGroupConsumer extends AbstractRedisStreamConsumer {
+
+ private final String group;
+ private final String consumer;
+
+ private final DataConverter dataConverter;
+
+ public RedisStreamGroupConsumer(
+ String groupName,
+ String consumerName,
+ DataConverter dataConverter,
+ String streamKey,
+ Properties config) {
+ this(
+ groupName,
+ consumerName,
+ StartupMode.GROUP_OFFSETS,
+ dataConverter,
+ Arrays.asList(streamKey),
+ config);
+ }
+
+ public RedisStreamGroupConsumer(
+ String groupName,
+ String consumerName,
+ StartupMode startupMode,
+ DataConverter dataConverter,
+ List streamKeys,
+ Properties config) {
+ super(startupMode, streamKeys, config);
+ this.group = groupName;
+ this.consumer = consumerName;
+ this.dataConverter = dataConverter;
+ }
+
+ public RedisStreamGroupConsumer(
+ String groupName,
+ String consumerName,
+ DataConverter dataConverter,
+ List streamKeys,
+ List streamIds,
+ Properties config) {
+ super(streamKeys, streamIds, config);
+ this.group = groupName;
+ this.consumer = consumerName;
+ this.dataConverter = dataConverter;
+ }
+
+ @Override
+ protected List>> read(Jedis jedis) {
+ return jedis.xreadGroup(
+ group,
+ consumer,
+ // XReadGroupParams.xReadGroupParams().count(1).block(0).noAck(),
+ XReadGroupParams.xReadGroupParams().count(1).noAck(),
+ streamEntryIds);
+ }
+
+ @Override
+ protected void collect(
+ SourceContext sourceContext, String streamKey, StreamEntry streamEntry) {
+ sourceContext.collect(dataConverter.toData(streamEntry.getFields()));
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java
new file mode 100644
index 0000000..3b615e6
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.config;
+
+import redis.clients.jedis.Protocol;
+
+/** */
+public class RedisConfigConstants {
+
+ public static final String REDIS_HOST = "redis.host";
+
+ public static final String REDIS_PORT = "redis.port";
+
+ public static final String DEFAULT_REDIS_HOST = Protocol.DEFAULT_HOST;
+
+ public static final int DEFAULT_REDIS_PORT = Protocol.DEFAULT_PORT;
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java
new file mode 100644
index 0000000..785a3bf
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.config;
+
+/**
+ * Startup modes for the Redis Consumer. Based on
+ * flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+ */
+public enum StartupMode {
+
+ /** Start from the earliest offset possible. */
+ EARLIEST,
+
+ /** Start from user-supplied timestamp for each partition. */
+ TIMESTAMP,
+
+ /** Start from user-supplied specific offsets for each partition. */
+ SPECIFIC_OFFSETS,
+
+ /** Start from the latest offset. */
+ LATEST,
+
+ /** Start from latest group offset. */
+ GROUP_OFFSETS
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java
new file mode 100644
index 0000000..e646fa4
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.internal;
+
+import org.apache.flink.streaming.connectors.redis.DataConverter;
+import org.apache.flink.streaming.connectors.redis.MapConverter;
+import org.apache.flink.types.Row;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** */
+public class FieldBoundDataRowToMap implements MapConverter, DataConverter {
+
+ private final String[] fieldNames;
+
+ public FieldBoundDataRowToMap(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public Map toMap(Row row) {
+ int arity = row.getArity();
+ if (arity != fieldNames.length) {
+ throw new RuntimeException();
+ }
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < arity; i++) {
+ String key = fieldNames[i];
+ String value = (String) row.getField(i);
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public Row toData(Map map) {
+ int size = map.size();
+ if (size != fieldNames.length) {
+ throw new RuntimeException();
+ }
+ Object[] array = new Object[size];
+ for (int i = 0; i < size; i++) {
+ array[i] = map.get(fieldNames[i]);
+ }
+ return Row.of(array);
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java
new file mode 100644
index 0000000..5c23eb6
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.internal;
+
+import org.apache.flink.streaming.connectors.redis.DataConverter;
+import org.apache.flink.streaming.connectors.redis.MapConverter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** */
+public class FieldBoundRowDataToMap implements MapConverter, DataConverter {
+
+ private final String[] fieldNames;
+
+ public FieldBoundRowDataToMap(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public Map toMap(RowData row) {
+ int arity = row.getArity();
+ if (arity != fieldNames.length) {
+ throw new RuntimeException();
+ }
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < arity; i++) {
+ String key = fieldNames[i];
+ String value = row.getString(i).toString();
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public RowData toData(Map map) {
+ int size = map.size();
+ if (size != fieldNames.length) {
+ throw new RuntimeException();
+ }
+ Object[] array = new Object[size];
+ for (int i = 0; i < size; i++) {
+ array[i] = map.get(fieldNames[i]);
+ }
+ return GenericRowData.of(array);
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java
new file mode 100644
index 0000000..164c4e4
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.internal;
+
+import org.apache.flink.streaming.connectors.redis.DataConverter;
+import org.apache.flink.streaming.connectors.redis.MapConverter;
+import org.apache.flink.types.Row;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** */
+public class SchemalessDataRowToMap implements MapConverter, DataConverter {
+
+ public SchemalessDataRowToMap() {}
+
+ @Override
+ public Map toMap(Row input) {
+ int arity = input.getArity();
+ if ((arity & 1) != 0) {
+ throw new RuntimeException();
+ }
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < arity; i += 2) {
+ String key = (String) input.getField(i);
+ String value = (String) input.getField(i + 1);
+ map.put(key, value);
+ }
+ return map;
+ }
+
+ @Override
+ public Row toData(Map map) {
+ int size = map.size();
+ Object[] array = new Object[size << 1];
+ int index = 0;
+ for (Map.Entry entry : map.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ array[index++] = key;
+ array[index++] = value;
+ }
+ return Row.of(array);
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java
new file mode 100644
index 0000000..3123ba3
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.redis.FlinkRedisStreamProducer;
+import org.apache.flink.streaming.connectors.redis.MapConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+/** Redis-Stream-backed {@link DynamicTableSink}. */
+@Internal
+public class RedisStreamDynamicSink implements DynamicTableSink, SupportsPartitioning {
+
+ private static final ChangelogMode CHANGELOG_MODE = ChangelogMode.insertOnly();
+
+ /** The Redis Stream to write to. */
+ private final String streamKey;
+
+ /** Properties for the Redis (Stream) producer. */
+ private final Properties producerProperties;
+
+ private final MapConverter converter;
+
+ public RedisStreamDynamicSink(
+ String streamKey, Properties producerProperties, MapConverter converter) {
+ this.streamKey =
+ Preconditions.checkNotNull(streamKey, "Redis Stream key name must not be null");
+ this.producerProperties =
+ Preconditions.checkNotNull(
+ producerProperties,
+ "Properties for the Flink Redis producer must not be null");
+ this.converter = Preconditions.checkNotNull(converter, "Converter must not be null");
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return CHANGELOG_MODE;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ FlinkRedisStreamProducer redisStreamProducer =
+ new FlinkRedisStreamProducer<>(converter, streamKey, producerProperties);
+
+ return SinkFunctionProvider.of(redisStreamProducer);
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new RedisStreamDynamicSink(streamKey, producerProperties, converter);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "RedisStream";
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // SupportsPartitioning
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void applyStaticPartition(Map partition) {
+ // nothing to do
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Value semantics for equals and hashCode
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RedisStreamDynamicSink that = (RedisStreamDynamicSink) o;
+ return Objects.equals(streamKey, that.streamKey)
+ && Objects.equals(producerProperties, that.producerProperties)
+ && Objects.equals(converter, that.converter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(streamKey, producerProperties, converter);
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java
new file mode 100644
index 0000000..03f9237
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.table;
+
+import org.apache.flink.streaming.connectors.redis.AbstractRedisStreamConsumer;
+import org.apache.flink.streaming.connectors.redis.DataConverter;
+import org.apache.flink.streaming.connectors.redis.RedisStreamConsumer;
+import org.apache.flink.streaming.connectors.redis.RedisStreamGroupConsumer;
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.StreamEntryID;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+/** */
+public class RedisStreamDynamicSource implements ScanTableSource {
+
+ private static final ChangelogMode CHANGELOG_MODE = ChangelogMode.insertOnly();
+
+ private final Properties config;
+
+ private final Optional groupName;
+
+ private final Optional consumerName;
+
+ /** The Redis key to consume. */
+ private final String streamKey;
+
+ /**
+ * The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}).
+ */
+ private final StartupMode startupMode;
+
+ /**
+ * Specific startup offset; only relevant when startup mode is {@link
+ * StartupMode#SPECIFIC_OFFSETS}.
+ */
+ private StreamEntryID streamEntryId;
+
+ /**
+ * Specific startup timestamp; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.
+ */
+ private Long timestamp;
+
+ private final DataConverter converter;
+
+ private RedisStreamDynamicSource(
+ Properties config,
+ Optional groupName,
+ Optional consumerName,
+ String streamKey,
+ StartupMode startupMode,
+ DataConverter rowMapConverter) {
+ this.config = config;
+ this.streamKey = Preconditions.checkNotNull(streamKey, "Key must not be null.");
+ this.groupName = groupName;
+ this.consumerName = consumerName;
+ this.converter =
+ Preconditions.checkNotNull(rowMapConverter, "RowMapConverter must not be null.");
+ this.startupMode = Preconditions.checkNotNull(startupMode, "StartupMode must not be null.");
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String streamKey,
+ StreamEntryID streamEntryId,
+ DataConverter rowMapConverter) {
+ this(
+ config,
+ Optional.empty(),
+ Optional.empty(),
+ streamKey,
+ StartupMode.SPECIFIC_OFFSETS,
+ rowMapConverter);
+ this.streamEntryId = streamEntryId;
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String streamKey,
+ Long timestamp,
+ DataConverter rowMapConverter) {
+ this(
+ config,
+ Optional.empty(),
+ Optional.empty(),
+ streamKey,
+ StartupMode.TIMESTAMP,
+ rowMapConverter);
+ this.timestamp = timestamp;
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String streamKey,
+ StartupMode startupMode,
+ DataConverter rowMapConverter) {
+ this(config, Optional.empty(), Optional.empty(), streamKey, startupMode, rowMapConverter);
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String groupName,
+ String consumerName,
+ String streamKey,
+ StreamEntryID streamEntryId,
+ DataConverter rowMapConverter) {
+ this(
+ config,
+ Optional.of(groupName),
+ Optional.of(consumerName),
+ streamKey,
+ StartupMode.SPECIFIC_OFFSETS,
+ rowMapConverter);
+ this.streamEntryId = streamEntryId;
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String groupName,
+ String consumerName,
+ String streamKey,
+ Long timestamp,
+ DataConverter rowMapConverter) {
+ this(
+ config,
+ Optional.of(groupName),
+ Optional.of(consumerName),
+ streamKey,
+ StartupMode.TIMESTAMP,
+ rowMapConverter);
+ this.timestamp = timestamp;
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String groupName,
+ String consumerName,
+ String streamKey,
+ StartupMode startupMode,
+ DataConverter rowMapConverter) {
+ this(
+ config,
+ Optional.of(groupName),
+ Optional.of(consumerName),
+ streamKey,
+ startupMode,
+ rowMapConverter);
+ }
+
+ public RedisStreamDynamicSource(
+ Properties config,
+ String groupName,
+ String consumerName,
+ String streamKey,
+ DataConverter rowMapConverter) {
+ this(
+ config,
+ Optional.of(groupName),
+ Optional.of(consumerName),
+ streamKey,
+ StartupMode.GROUP_OFFSETS,
+ rowMapConverter);
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ if (groupName.isPresent()) {
+ switch (startupMode) {
+ case GROUP_OFFSETS:
+ return new RedisStreamDynamicSource(
+ config, groupName.get(), consumerName.get(), streamKey, converter);
+ case TIMESTAMP:
+ return new RedisStreamDynamicSource(
+ config,
+ groupName.get(),
+ consumerName.get(),
+ streamKey,
+ timestamp,
+ converter);
+ case SPECIFIC_OFFSETS:
+ return new RedisStreamDynamicSource(
+ config,
+ groupName.get(),
+ consumerName.get(),
+ streamKey,
+ streamEntryId,
+ converter);
+ default:
+ return new RedisStreamDynamicSource(
+ config,
+ groupName.get(),
+ consumerName.get(),
+ streamKey,
+ startupMode,
+ converter);
+ }
+ } else {
+ switch (startupMode) {
+ case TIMESTAMP:
+ return new RedisStreamDynamicSource(config, streamKey, timestamp, converter);
+ case SPECIFIC_OFFSETS:
+ return new RedisStreamDynamicSource(
+ config, streamKey, streamEntryId, converter);
+ default:
+ return new RedisStreamDynamicSource(config, streamKey, startupMode, converter);
+ }
+ }
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "RedisStream";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final RedisStreamDynamicSource that = (RedisStreamDynamicSource) o;
+ return Objects.equals(config, that.config)
+ && Objects.equals(groupName, that.groupName)
+ && Objects.equals(consumerName, that.consumerName)
+ && Objects.equals(streamKey, that.streamKey)
+ && startupMode == that.startupMode
+ && Objects.equals(streamEntryId, that.streamEntryId)
+ && Objects.equals(timestamp, that.timestamp)
+ && Objects.equals(converter, that.converter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ config,
+ groupName,
+ consumerName,
+ streamKey,
+ startupMode,
+ streamEntryId,
+ timestamp,
+ converter);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return CHANGELOG_MODE;
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ AbstractRedisStreamConsumer redisStreamConsumer = createRedisConsumer();
+
+ return SourceFunctionProvider.of(redisStreamConsumer, false);
+ }
+
+ protected AbstractRedisStreamConsumer createRedisConsumer() {
+ if (groupName.isPresent()) {
+ switch (startupMode) {
+ case GROUP_OFFSETS:
+ return new RedisStreamGroupConsumer<>(
+ groupName.get(), consumerName.get(), converter, streamKey, config);
+ case TIMESTAMP:
+ return new RedisStreamGroupConsumer<>(
+ groupName.get(),
+ consumerName.get(),
+ converter,
+ Arrays.asList(streamKey),
+ RedisStreamGroupConsumer.convertToStreamEntryIDs(
+ Arrays.asList(timestamp)),
+ config);
+ case SPECIFIC_OFFSETS:
+ return new RedisStreamGroupConsumer<>(
+ groupName.get(),
+ consumerName.get(),
+ converter,
+ Arrays.asList(streamKey),
+ Arrays.asList(streamEntryId),
+ config);
+ default:
+ return new RedisStreamGroupConsumer<>(
+ groupName.get(),
+ consumerName.get(),
+ startupMode,
+ converter,
+ Arrays.asList(streamKey),
+ config);
+ }
+ } else {
+ switch (startupMode) {
+ case TIMESTAMP:
+ return new RedisStreamConsumer<>(
+ converter,
+ Arrays.asList(streamKey),
+ AbstractRedisStreamConsumer.convertToStreamEntryIDs(
+ Arrays.asList(timestamp)),
+ config);
+ case SPECIFIC_OFFSETS:
+ return new RedisStreamConsumer<>(
+ converter,
+ Arrays.asList(streamKey),
+ Arrays.asList(streamEntryId),
+ config);
+ default:
+ return new RedisStreamConsumer<>(config, startupMode, converter, streamKey);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java b/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java
new file mode 100644
index 0000000..95966c8
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.util;
+
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntryID;
+
+/** */
+public final class ConsumerGroupUtils {
+
+ public static void createConsumerGroup(
+ Jedis jedis,
+ String streamKey,
+ String groupName,
+ StreamEntryID streamEntryID,
+ boolean createStreamIfAbsent) {
+ jedis.xgroupCreate(streamKey, groupName, streamEntryID, createStreamIfAbsent);
+ }
+ //
+ // public static void createConsumerGroup(Jedis jedis, String streamKey, String groupName,
+ // long timestamp, boolean createStreamIfAbsent) {
+ // createConsumerGroup(jedis, streamKey, groupName, new StreamEntryID(timestamp, 0L),
+ // createStreamIfAbsent);
+ // }
+
+ public static void createConsumerGroup(
+ Jedis jedis,
+ String streamKey,
+ String groupName,
+ StartupMode startupMode,
+ boolean createStreamIfAbsent) {
+ final StreamEntryID streamEntryID;
+ switch (startupMode) {
+ case EARLIEST:
+ streamEntryID = new StreamEntryID();
+ break;
+ case LATEST:
+ streamEntryID = StreamEntryID.LAST_ENTRY;
+ break;
+ case SPECIFIC_OFFSETS:
+ throw new RuntimeException("Use the method with StreamEntryID as param");
+ case TIMESTAMP:
+ throw new RuntimeException("Use the method with 'long timestamp' param");
+ default:
+ throw new RuntimeException(
+ "Consumer Group cannot be initialized from " + startupMode);
+ }
+ createConsumerGroup(jedis, streamKey, groupName, streamEntryID, createStreamIfAbsent);
+ }
+}
diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java b/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java
new file mode 100644
index 0000000..74c2dee
--- /dev/null
+++ b/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.util;
+
+import org.apache.flink.streaming.connectors.redis.config.RedisConfigConstants;
+
+import redis.clients.jedis.Jedis;
+
+import java.util.Properties;
+
+/** */
+public class JedisUtils {
+
+ public static Jedis createResource(Properties configProps) {
+
+ String host =
+ configProps.getProperty(
+ RedisConfigConstants.REDIS_HOST, RedisConfigConstants.DEFAULT_REDIS_HOST);
+
+ int port =
+ Integer.parseInt(
+ configProps.getProperty(
+ RedisConfigConstants.REDIS_PORT,
+ Integer.toString(RedisConfigConstants.DEFAULT_REDIS_PORT)));
+
+ return new Jedis(host, port);
+ }
+}
diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..730eda9
--- /dev/null
+++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.connectors.redis.config.RedisConfigConstants;
+import org.apache.flink.streaming.connectors.redis.util.JedisUtils;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.testcontainers.containers.GenericContainer;
+import redis.clients.jedis.Jedis;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/** */
+public abstract class RedisITCaseBase extends AbstractTestBase {
+
+ public static final String REDIS_IMAGE = "redis";
+ private static final int REDIS_PORT = 6379;
+
+ private static final AtomicBoolean running = new AtomicBoolean(false);
+ private static final GenericContainer> container =
+ new GenericContainer<>(REDIS_IMAGE).withExposedPorts(REDIS_PORT);
+
+ protected Jedis jedis;
+
+ protected static synchronized void start() {
+ if (!running.get()) {
+ container.start();
+ running.set(true);
+ }
+ }
+
+ protected static void stop() {
+ container.stop();
+ running.set(false);
+ }
+
+ @Before
+ public void setUp() {
+ jedis = JedisUtils.createResource(getConfigProperties());
+ jedis.flushAll();
+ }
+
+ @After
+ public void tearDown() {
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ protected Properties getConfigProperties() {
+ start();
+
+ Properties configProps = new Properties();
+ configProps.setProperty(RedisConfigConstants.REDIS_HOST, container.getContainerIpAddress());
+ configProps.setProperty(
+ RedisConfigConstants.REDIS_PORT, Integer.toString(container.getFirstMappedPort()));
+ return configProps;
+ }
+}
diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java
new file mode 100644
index 0000000..c9ac95c
--- /dev/null
+++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntryID;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+/** */
+public class RedisStreamConsumerITCase extends RedisITCaseBase {
+
+ private static final int NUM_ELEMENTS = 4;
+ private static final int NUM_FIELDS = 3;
+ private static final String REDIS_KEY = "TEST_KEY";
+
+ private StreamExecutionEnvironment env;
+
+ private static final AtomicInteger count = new AtomicInteger();
+
+ @BeforeClass
+ public static void prepare() {
+ start();
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ stop();
+ }
+
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ count.set(0);
+ }
+
+ @After
+ @Override
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void redisConsumer() throws Exception {
+ populate(jedis);
+
+ DataStreamSource source =
+ env.addSource(
+ new RedisStreamConsumer<>(
+ getConfigProperties(),
+ StartupMode.EARLIEST,
+ new SchemalessDataRowToMap(),
+ REDIS_KEY));
+ source.returns(
+ Types.ROW(
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING));
+ source.setParallelism(1);
+
+ source.addSink(new TestRowSinkFunction());
+
+ env.execute("Test Redis Row Consumer");
+
+ assertEquals(NUM_ELEMENTS, count.get());
+ }
+
+ private static void populate(Jedis iJedis) {
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ HashMap map = new HashMap<>();
+ for (int j = 0; j < NUM_FIELDS; j++) {
+ map.put("f" + j, "" + j);
+ }
+ iJedis.xadd(REDIS_KEY, StreamEntryID.NEW_ENTRY, map);
+ }
+ }
+
+ private static class TestRowSinkFunction implements SinkFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ TestRowSinkFunction() {}
+
+ @Override
+ public void invoke(Row value, Context context) throws Exception {
+ count.incrementAndGet();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java
new file mode 100644
index 0000000..665eebd
--- /dev/null
+++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.redis.config.StartupMode;
+import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap;
+import org.apache.flink.streaming.connectors.redis.util.ConsumerGroupUtils;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntryID;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+/** */
+public class RedisStreamGroupConsumerITCase extends RedisITCaseBase {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(RedisStreamGroupConsumerITCase.class);
+
+ private static final int NUM_THREADS = 3;
+ private static final int NUM_ELEMENTS = 20;
+ private static final int NUM_FIELDS = 3;
+
+ private static final String REDIS_KEY = "TEST_KEY";
+ private static final String GROUP_NAME = "FlinkGroup";
+
+ StreamExecutionEnvironment env;
+ private static final AtomicInteger[] count = new AtomicInteger[NUM_THREADS];
+
+ @BeforeClass
+ public static void prepare() {
+ start();
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ stop();
+ }
+
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ for (int t = 0; t < NUM_THREADS; t++) {
+ count[t] = new AtomicInteger();
+ }
+ }
+
+ @After
+ @Override
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void redisGroupConsumer() throws Exception {
+ ConsumerGroupUtils.createConsumerGroup(
+ jedis, REDIS_KEY, GROUP_NAME, StartupMode.EARLIEST, true);
+ populate(jedis);
+
+ for (int t = 0; t < NUM_THREADS; t++) {
+ DataStreamSource source =
+ env.addSource(
+ new RedisStreamGroupConsumer<>(
+ GROUP_NAME,
+ "consumer" + t,
+ new SchemalessDataRowToMap(),
+ REDIS_KEY,
+ getConfigProperties()));
+ source.returns(
+ Types.ROW(
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING,
+ Types.STRING));
+ source.setParallelism(1);
+
+ source.addSink(new TestRowSinkFunction(t));
+ }
+
+ env.execute("Test Redis Consumer Group");
+
+ int total = 0;
+ for (int t = 0; t < NUM_THREADS; t++) {
+ if (count[t].get() == 0) {
+ LOGGER.warn("consumer{} could not read any entry.", t);
+ }
+ total += count[t].get();
+ }
+ assertEquals(NUM_ELEMENTS, total);
+ }
+
+ private static void populate(Jedis iJedis) {
+ for (int i = 0; i < NUM_ELEMENTS; i++) {
+ HashMap map = new HashMap<>();
+ for (int j = 0; j < NUM_FIELDS; j++) {
+ map.put("f" + j, "" + j);
+ }
+ iJedis.xadd(REDIS_KEY, StreamEntryID.NEW_ENTRY, map);
+ }
+ }
+
+ private static class TestRowSinkFunction implements SinkFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int countIndex;
+
+ TestRowSinkFunction(final int countIndex) {
+ this.countIndex = countIndex;
+ }
+
+ @Override
+ public void invoke(Row value, Context context) throws Exception {
+ count[countIndex].incrementAndGet();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java
new file mode 100644
index 0000000..9e2801a
--- /dev/null
+++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+/** */
+public class RedisStreamProducerITCase extends RedisITCaseBase {
+
+ private static final int NUM_ELEMENTS = 4;
+ private static final int NUM_FIELDS = 3;
+ private static final String REDIS_KEY = "TEST_KEY";
+
+ private StreamExecutionEnvironment env;
+
+ @BeforeClass
+ public static void prepare() {
+ start();
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ stop();
+ }
+
+ @Before
+ @Override
+ public void setUp() {
+ super.setUp();
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ @After
+ @Override
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @Test
+ public void redisProducer() throws Exception {
+ DataStreamSource source = env.addSource(new TestRowSourceFunction());
+
+ SinkFunction producer =
+ new FlinkRedisStreamProducer<>(
+ new SchemalessDataRowToMap(), REDIS_KEY, getConfigProperties());
+ source.addSink(producer);
+
+ env.execute("Test Redis Stream Producer");
+
+ assertEquals(NUM_ELEMENTS, jedis.xlen(REDIS_KEY).intValue());
+ }
+
+ private static class TestRowSourceFunction implements SourceFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean running = true;
+
+ @Override
+ public void run(SourceContext ctx) throws Exception {
+
+ for (int i = 0; i < NUM_ELEMENTS && running; i++) {
+ ArrayList