From 3c5044ad46d4d36bc79ee0a04423754bf367afad Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 18 May 2022 22:09:29 +0600 Subject: [PATCH 1/2] First commit Mostly brought from https://github.com/apache/flink/pull/15487 --- pom.xml | 138 ++++++++ .../redis/AbstractRedisStreamConsumer.java | 147 ++++++++ .../connectors/redis/DataConverter.java | 27 ++ .../redis/FlinkRedisProducerBase.java | 85 +++++ .../redis/FlinkRedisStreamProducer.java | 43 +++ .../connectors/redis/MapConverter.java | 27 ++ .../connectors/redis/RedisConsumerBase.java | 106 ++++++ .../connectors/redis/RedisStreamConsumer.java | 72 ++++ .../redis/RedisStreamGroupConsumer.java | 102 ++++++ .../redis/config/RedisConfigConstants.java | 32 ++ .../connectors/redis/config/StartupMode.java | 40 +++ .../internal/FieldBoundDataRowToMap.java | 63 ++++ .../internal/FieldBoundRowDataToMap.java | 64 ++++ .../internal/SchemalessDataRowToMap.java | 60 ++++ .../redis/table/RedisStreamDynamicSink.java | 113 ++++++ .../redis/table/RedisStreamDynamicSource.java | 330 ++++++++++++++++++ .../redis/util/ConsumerGroupUtils.java | 67 ++++ .../connectors/redis/util/JedisUtils.java | 43 +++ .../connectors/redis/RedisITCaseBase.java | 78 +++++ .../redis/RedisStreamConsumerITCase.java | 125 +++++++ .../redis/RedisStreamGroupConsumerITCase.java | 151 ++++++++ .../redis/RedisStreamProducerITCase.java | 107 ++++++ 22 files changed, 2020 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java create mode 100644 src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java create mode 100644 src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java create mode 100644 src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java create mode 100644 src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java create mode 100644 src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ec3f466 --- /dev/null +++ b/pom.xml @@ -0,0 +1,138 @@ + + + + 4.0.0 + + org.apache.flink + flink-connector-redis + 1.16-SNAPSHOT + jar + Flink : Connectors : Redis + + + + The Apache Software License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + UTF-8 + + 1.16-SNAPSHOT + 15.0 + 1.8 + 1.7.32 + ${target.java.version} + ${target.java.version} + + 2.12.7 + 2.12 + 3.8.0 + 1.7.32 + 1.16.2 + 4.13.2 + 1.3 + + + + + + + + redis.clients + jedis + ${jedis.version} + compile + + + org.slf4j + slf4j.api + + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + + junit + junit + ${junit.version} + jar + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + %regex[.*ITCase.*] + + + + + + + \ No newline at end of file diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java new file mode 100644 index 0000000..caf4776 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; + +import java.util.AbstractMap.SimpleEntry; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +/** @param */ +public abstract class AbstractRedisStreamConsumer extends RedisConsumerBase { + + protected final Entry[] streamEntryIds; + private final Map keyIndex = new HashMap<>(); + + public AbstractRedisStreamConsumer( + StartupMode startupMode, String[] streamKeys, Properties configProps) { + super(Arrays.asList(streamKeys), configProps); + final StreamEntryID streamEntryID; + switch (startupMode) { + case EARLIEST: + streamEntryID = new StreamEntryID(); + break; + case LATEST: + streamEntryID = StreamEntryID.LAST_ENTRY; + break; + case GROUP_OFFSETS: + streamEntryID = StreamEntryID.UNRECEIVED_ENTRY; + break; + case SPECIFIC_OFFSETS: + throw new RuntimeException( + "Use the constructor with 'StreamEntryID[] streamIds' as param"); + case TIMESTAMP: + throw new RuntimeException("Use the constructor with 'Long[] timestamps' param"); + default: + throw new IllegalStateException(); + } + this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID); + initializeKeyIndex(); + } + + public AbstractRedisStreamConsumer( + String[] streamKeys, Long[] timestamps, Properties configProps) { + this(streamKeys, streamEntryIds(timestamps), configProps); + } + + public AbstractRedisStreamConsumer( + String[] streamKeys, StreamEntryID[] streamIds, Properties configProps) { + this(prepareStreamEntryIds(streamKeys, streamIds), configProps); + } + + private AbstractRedisStreamConsumer( + Entry[] streamIds, Properties configProps) { + super(null, configProps); + this.streamEntryIds = streamIds; + initializeKeyIndex(); + } + + @Override + protected final boolean readAndCollect( + Jedis jedis, List streamKeys, SourceContext sourceContext) { + boolean anyEntry = false; + List>> response = read(jedis); + if (response != null) { + for (Entry> streamEntries : response) { + String streamKey = streamEntries.getKey(); + for (StreamEntry entry : streamEntries.getValue()) { + anyEntry = true; + collect(sourceContext, streamKey, entry); + updateIdForKey(streamKey, entry.getID()); + } + } + } + return anyEntry; + } + + protected abstract List>> read(Jedis jedis); + + protected abstract void collect( + SourceContext sourceContext, String streamKey, StreamEntry streamEntry); + + protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) { + int index = keyIndex.get(streamKey); + if (this.streamEntryIds[index].getValue().toString().equals(">")) { + // skip + } else { + this.streamEntryIds[index].setValue(streamEntryID); + } + } + + private void initializeKeyIndex() { + int index = 0; + for (Entry streamEntryId : streamEntryIds) { + keyIndex.put(streamEntryId.getKey(), index++); + } + } + + private static Entry[] prepareStreamEntryIds( + String[] streamKeys, StreamEntryID streamId) { + Entry[] streams = new Entry[streamKeys.length]; + for (int i = 0; i < streamKeys.length; i++) { + streams[i] = new SimpleEntry<>(streamKeys[i], streamId); + } + return (Entry[]) streams; + } + + private static Entry[] prepareStreamEntryIds( + String[] streamKeys, StreamEntryID[] streamIds) { + Entry[] streams = new Entry[streamKeys.length]; + for (int i = 0; i < streamKeys.length; i++) { + streams[i] = new SimpleEntry<>(streamKeys[i], streamIds[i]); + } + return (Entry[]) streams; + } + + private static StreamEntryID[] streamEntryIds(Long[] timestamps) { + StreamEntryID[] entryIds = new StreamEntryID[timestamps.length]; + for (int i = 0; i < timestamps.length; i++) { + entryIds[i] = new StreamEntryID(timestamps[i], 0L); + } + return entryIds; + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java b/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java new file mode 100644 index 0000000..106f1a3 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import java.io.Serializable; +import java.util.Map; + +/** @param */ +public interface DataConverter extends Serializable { + + OUT toData(Map input); +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java new file mode 100644 index 0000000..0fa3ff2 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.util.JedisUtils; + +import redis.clients.jedis.Jedis; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** @param */ +public abstract class FlinkRedisProducerBase extends RichSinkFunction + implements CheckpointedFunction { + + private final Properties configProps; + + private transient Jedis jedis; + + private final String key; + + public FlinkRedisProducerBase(String key, Properties configProps) { + checkNotNull(key, "key can not be null"); + this.key = key; + + checkNotNull(configProps, "configProps can not be null"); + this.configProps = configProps; + } + + /** Initializes the connection to Redis. */ + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + // code + this.jedis = JedisUtils.createResource(this.configProps); + } + + @Override + public void close() throws Exception { + super.close(); + // code + if (jedis != null) { + jedis.close(); + } + } + + @Override + public void invoke(OUT value, Context context) throws Exception { + invoke(this.jedis, this.key, value, context); + } + + protected abstract void invoke(Jedis resource, String key, OUT value, Context context) + throws Exception; + + @Override + public void snapshotState(FunctionSnapshotContext context) { + // in synchronous mode, nothing to do + } + + @Override + public void initializeState(FunctionInitializationContext context) { + // nothing to do + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java new file mode 100644 index 0000000..72c654c --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +import java.util.Map; +import java.util.Properties; + +/** @param */ +public class FlinkRedisStreamProducer extends FlinkRedisProducerBase { + + private final MapConverter mapConverter; + + public FlinkRedisStreamProducer( + MapConverter mapConverter, String streamKey, Properties configProps) { + super(streamKey, configProps); + this.mapConverter = mapConverter; + } + + @Override + public void invoke(Jedis jedis, String streamKey, OUT value, Context context) throws Exception { + // code + Map map = mapConverter.toMap(value); + jedis.xadd(streamKey, StreamEntryID.NEW_ENTRY, map); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java b/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java new file mode 100644 index 0000000..7b141a5 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import java.io.Serializable; +import java.util.Map; + +/** @param */ +public interface MapConverter extends Serializable { + + Map toMap(IN input); +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java new file mode 100644 index 0000000..ab87a7b --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.connectors.redis.util.JedisUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** @param */ +public abstract class RedisConsumerBase extends RichParallelSourceFunction { + + protected static final Logger LOG = LoggerFactory.getLogger(RedisConsumerBase.class); + + private final Properties configProps; + + private transient Jedis jedis; + + private final List keys; + + /** Flag indicating whether the consumer is still running. */ + private volatile boolean running = true; + + public RedisConsumerBase(List keys, Properties configProps) { + checkNotNull(keys, "keys can not be null"); + checkArgument(keys.size() != 0, "must be consuming at least 1 key"); + this.keys = Collections.unmodifiableList(keys); + + this.configProps = checkNotNull(configProps, "configProps can not be null"); + + if (LOG.isInfoEnabled()) { + StringBuilder sb = new StringBuilder(); + for (String key : keys) { + sb.append(key).append(", "); + } + LOG.info( + "Flink Redis Stream Consumer is going to read the following streams: {}", + sb.toString()); + } + } + + @Override + public void open(Configuration configuration) throws Exception { + // code + super.open(configuration); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + if (!running) { + return; + } + this.jedis = JedisUtils.createResource(this.configProps); + + while (running) { + running = readAndCollect(this.jedis, this.keys, sourceContext); + } + } + + protected abstract boolean readAndCollect( + Jedis jedis, List keys, SourceContext sourceContext); + + @Override + public void cancel() { + // set ourselves as not running; + // this would let the main discovery loop escape as soon as possible + running = false; + + // abort the fetcher, if there is one + if (jedis != null) { + jedis.close(); + } + } + + @Override + public void close() throws Exception { + cancel(); + + super.close(); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java new file mode 100644 index 0000000..5e6a7ea --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; + +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +/** @param */ +public class RedisStreamConsumer extends AbstractRedisStreamConsumer { + + private final DataConverter dataConverter; + + public RedisStreamConsumer( + Properties configProps, + StartupMode startupMode, + DataConverter dataConverter, + String... streamKeys) { + super(startupMode, streamKeys, configProps); + this.dataConverter = dataConverter; + } + + public RedisStreamConsumer( + DataConverter dataConverter, + String[] streamKeys, + Long[] timestamps, + Properties configProps) { + super(streamKeys, timestamps, configProps); + this.dataConverter = dataConverter; + } + + public RedisStreamConsumer( + DataConverter dataConverter, + String[] streamKeys, + StreamEntryID[] streamIds, + Properties configProps) { + super(streamKeys, streamIds, configProps); + this.dataConverter = dataConverter; + } + + @Override + protected List>> read(Jedis jedis) { + return jedis.xread(1, 0L, streamEntryIds); + } + + @Override + protected void collect( + SourceContext sourceContext, String streamKey, StreamEntry streamEntry) { + sourceContext.collect(dataConverter.toData(streamEntry.getFields())); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java new file mode 100644 index 0000000..bfaa1a3 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; + +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +/** @param */ +public class RedisStreamGroupConsumer extends AbstractRedisStreamConsumer { + + private final String group; + private final String consumer; + + private final DataConverter dataConverter; + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + DataConverter dataConverter, + String streamKey, + Properties config) { + this( + groupName, + consumerName, + StartupMode.GROUP_OFFSETS, + dataConverter, + new String[] {streamKey}, + config); + } + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + StartupMode startupMode, + DataConverter dataConverter, + String[] streamKeys, + Properties config) { + super(startupMode, streamKeys, config); + this.group = groupName; + this.consumer = consumerName; + this.dataConverter = dataConverter; + } + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + DataConverter dataConverter, + String[] streamKeys, + Long[] timestamps, + Properties config) { + super(streamKeys, timestamps, config); + this.group = groupName; + this.consumer = consumerName; + this.dataConverter = dataConverter; + } + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + DataConverter dataConverter, + String[] streamKeys, + StreamEntryID[] streamIds, + Properties config) { + super(streamKeys, streamIds, config); + this.group = groupName; + this.consumer = consumerName; + this.dataConverter = dataConverter; + } + + @Override + protected List>> read(Jedis jedis) { + return jedis.xreadGroup(group, consumer, 1, 0L, true, streamEntryIds); + } + + @Override + protected void collect( + SourceContext sourceContext, String streamKey, StreamEntry streamEntry) { + sourceContext.collect(dataConverter.toData(streamEntry.getFields())); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java new file mode 100644 index 0000000..3b615e6 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.config; + +import redis.clients.jedis.Protocol; + +/** */ +public class RedisConfigConstants { + + public static final String REDIS_HOST = "redis.host"; + + public static final String REDIS_PORT = "redis.port"; + + public static final String DEFAULT_REDIS_HOST = Protocol.DEFAULT_HOST; + + public static final int DEFAULT_REDIS_PORT = Protocol.DEFAULT_PORT; +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java new file mode 100644 index 0000000..785a3bf --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.config; + +/** + * Startup modes for the Redis Consumer. Based on + * flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java + */ +public enum StartupMode { + + /** Start from the earliest offset possible. */ + EARLIEST, + + /** Start from user-supplied timestamp for each partition. */ + TIMESTAMP, + + /** Start from user-supplied specific offsets for each partition. */ + SPECIFIC_OFFSETS, + + /** Start from the latest offset. */ + LATEST, + + /** Start from latest group offset. */ + GROUP_OFFSETS +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java new file mode 100644 index 0000000..e646fa4 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.internal; + +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.types.Row; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** */ +public class FieldBoundDataRowToMap implements MapConverter, DataConverter { + + private final String[] fieldNames; + + public FieldBoundDataRowToMap(String[] fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public Map toMap(Row row) { + int arity = row.getArity(); + if (arity != fieldNames.length) { + throw new RuntimeException(); + } + Map map = new LinkedHashMap<>(); + for (int i = 0; i < arity; i++) { + String key = fieldNames[i]; + String value = (String) row.getField(i); + map.put(key, value); + } + return map; + } + + @Override + public Row toData(Map map) { + int size = map.size(); + if (size != fieldNames.length) { + throw new RuntimeException(); + } + Object[] array = new Object[size]; + for (int i = 0; i < size; i++) { + array[i] = map.get(fieldNames[i]); + } + return Row.of(array); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java new file mode 100644 index 0000000..5c23eb6 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.internal; + +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** */ +public class FieldBoundRowDataToMap implements MapConverter, DataConverter { + + private final String[] fieldNames; + + public FieldBoundRowDataToMap(String[] fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public Map toMap(RowData row) { + int arity = row.getArity(); + if (arity != fieldNames.length) { + throw new RuntimeException(); + } + Map map = new LinkedHashMap<>(); + for (int i = 0; i < arity; i++) { + String key = fieldNames[i]; + String value = row.getString(i).toString(); + map.put(key, value); + } + return map; + } + + @Override + public RowData toData(Map map) { + int size = map.size(); + if (size != fieldNames.length) { + throw new RuntimeException(); + } + Object[] array = new Object[size]; + for (int i = 0; i < size; i++) { + array[i] = map.get(fieldNames[i]); + } + return GenericRowData.of(array); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java new file mode 100644 index 0000000..164c4e4 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.internal; + +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.types.Row; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** */ +public class SchemalessDataRowToMap implements MapConverter, DataConverter { + + public SchemalessDataRowToMap() {} + + @Override + public Map toMap(Row input) { + int arity = input.getArity(); + if ((arity & 1) != 0) { + throw new RuntimeException(); + } + Map map = new LinkedHashMap<>(); + for (int i = 0; i < arity; i += 2) { + String key = (String) input.getField(i); + String value = (String) input.getField(i + 1); + map.put(key, value); + } + return map; + } + + @Override + public Row toData(Map map) { + int size = map.size(); + Object[] array = new Object[size << 1]; + int index = 0; + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + array[index++] = key; + array[index++] = value; + } + return Row.of(array); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java new file mode 100644 index 0000000..3123ba3 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.redis.FlinkRedisStreamProducer; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +/** Redis-Stream-backed {@link DynamicTableSink}. */ +@Internal +public class RedisStreamDynamicSink implements DynamicTableSink, SupportsPartitioning { + + private static final ChangelogMode CHANGELOG_MODE = ChangelogMode.insertOnly(); + + /** The Redis Stream to write to. */ + private final String streamKey; + + /** Properties for the Redis (Stream) producer. */ + private final Properties producerProperties; + + private final MapConverter converter; + + public RedisStreamDynamicSink( + String streamKey, Properties producerProperties, MapConverter converter) { + this.streamKey = + Preconditions.checkNotNull(streamKey, "Redis Stream key name must not be null"); + this.producerProperties = + Preconditions.checkNotNull( + producerProperties, + "Properties for the Flink Redis producer must not be null"); + this.converter = Preconditions.checkNotNull(converter, "Converter must not be null"); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return CHANGELOG_MODE; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + FlinkRedisStreamProducer redisStreamProducer = + new FlinkRedisStreamProducer<>(converter, streamKey, producerProperties); + + return SinkFunctionProvider.of(redisStreamProducer); + } + + @Override + public DynamicTableSink copy() { + return new RedisStreamDynamicSink(streamKey, producerProperties, converter); + } + + @Override + public String asSummaryString() { + return "RedisStream"; + } + + // -------------------------------------------------------------------------------------------- + // SupportsPartitioning + // -------------------------------------------------------------------------------------------- + + @Override + public void applyStaticPartition(Map partition) { + // nothing to do + } + + // -------------------------------------------------------------------------------------------- + // Value semantics for equals and hashCode + // -------------------------------------------------------------------------------------------- + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RedisStreamDynamicSink that = (RedisStreamDynamicSink) o; + return Objects.equals(streamKey, that.streamKey) + && Objects.equals(producerProperties, that.producerProperties) + && Objects.equals(converter, that.converter); + } + + @Override + public int hashCode() { + return Objects.hash(streamKey, producerProperties, converter); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java new file mode 100644 index 0000000..6676d82 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.table; + +import org.apache.flink.streaming.connectors.redis.AbstractRedisStreamConsumer; +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.RedisStreamConsumer; +import org.apache.flink.streaming.connectors.redis.RedisStreamGroupConsumer; +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import redis.clients.jedis.StreamEntryID; + +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +/** */ +public class RedisStreamDynamicSource implements ScanTableSource { + + private static final ChangelogMode CHANGELOG_MODE = ChangelogMode.insertOnly(); + + private final Properties config; + + private final Optional groupName; + + private final Optional consumerName; + + /** The Redis key to consume. */ + private final String streamKey; + + /** + * The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). + */ + private final StartupMode startupMode; + + /** + * Specific startup offset; only relevant when startup mode is {@link + * StartupMode#SPECIFIC_OFFSETS}. + */ + private StreamEntryID streamEntryId; + + /** + * Specific startup timestamp; only relevant when startup mode is {@link StartupMode#TIMESTAMP}. + */ + private Long timestamp; + + private final DataConverter converter; + + private RedisStreamDynamicSource( + Properties config, + Optional groupName, + Optional consumerName, + String streamKey, + StartupMode startupMode, + DataConverter rowMapConverter) { + this.config = config; + this.streamKey = Preconditions.checkNotNull(streamKey, "Key must not be null."); + this.groupName = groupName; + this.consumerName = consumerName; + this.converter = + Preconditions.checkNotNull(rowMapConverter, "RowMapConverter must not be null."); + this.startupMode = Preconditions.checkNotNull(startupMode, "StartupMode must not be null."); + } + + public RedisStreamDynamicSource( + Properties config, + String streamKey, + StreamEntryID streamEntryId, + DataConverter rowMapConverter) { + this( + config, + Optional.empty(), + Optional.empty(), + streamKey, + StartupMode.SPECIFIC_OFFSETS, + rowMapConverter); + this.streamEntryId = streamEntryId; + } + + public RedisStreamDynamicSource( + Properties config, + String streamKey, + Long timestamp, + DataConverter rowMapConverter) { + this( + config, + Optional.empty(), + Optional.empty(), + streamKey, + StartupMode.TIMESTAMP, + rowMapConverter); + this.timestamp = timestamp; + } + + public RedisStreamDynamicSource( + Properties config, + String streamKey, + StartupMode startupMode, + DataConverter rowMapConverter) { + this(config, Optional.empty(), Optional.empty(), streamKey, startupMode, rowMapConverter); + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + StreamEntryID streamEntryId, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + StartupMode.SPECIFIC_OFFSETS, + rowMapConverter); + this.streamEntryId = streamEntryId; + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + Long timestamp, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + StartupMode.TIMESTAMP, + rowMapConverter); + this.timestamp = timestamp; + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + StartupMode startupMode, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + startupMode, + rowMapConverter); + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + StartupMode.GROUP_OFFSETS, + rowMapConverter); + } + + @Override + public DynamicTableSource copy() { + if (groupName.isPresent()) { + switch (startupMode) { + case GROUP_OFFSETS: + return new RedisStreamDynamicSource( + config, groupName.get(), consumerName.get(), streamKey, converter); + case TIMESTAMP: + return new RedisStreamDynamicSource( + config, + groupName.get(), + consumerName.get(), + streamKey, + timestamp, + converter); + case SPECIFIC_OFFSETS: + return new RedisStreamDynamicSource( + config, + groupName.get(), + consumerName.get(), + streamKey, + streamEntryId, + converter); + default: + return new RedisStreamDynamicSource( + config, + groupName.get(), + consumerName.get(), + streamKey, + startupMode, + converter); + } + } else { + switch (startupMode) { + case TIMESTAMP: + return new RedisStreamDynamicSource(config, streamKey, timestamp, converter); + case SPECIFIC_OFFSETS: + return new RedisStreamDynamicSource( + config, streamKey, streamEntryId, converter); + default: + return new RedisStreamDynamicSource(config, streamKey, startupMode, converter); + } + } + } + + @Override + public String asSummaryString() { + return "RedisStream"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final RedisStreamDynamicSource that = (RedisStreamDynamicSource) o; + return Objects.equals(config, that.config) + && Objects.equals(groupName, that.groupName) + && Objects.equals(consumerName, that.consumerName) + && Objects.equals(streamKey, that.streamKey) + && startupMode == that.startupMode + && Objects.equals(streamEntryId, that.streamEntryId) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(converter, that.converter); + } + + @Override + public int hashCode() { + return Objects.hash( + config, + groupName, + consumerName, + streamKey, + startupMode, + streamEntryId, + timestamp, + converter); + } + + @Override + public ChangelogMode getChangelogMode() { + return CHANGELOG_MODE; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + AbstractRedisStreamConsumer redisStreamConsumer = createRedisConsumer(); + + return SourceFunctionProvider.of(redisStreamConsumer, false); + } + + protected AbstractRedisStreamConsumer createRedisConsumer() { + if (groupName.isPresent()) { + switch (startupMode) { + case GROUP_OFFSETS: + return new RedisStreamGroupConsumer<>( + groupName.get(), consumerName.get(), converter, streamKey, config); + case TIMESTAMP: + return new RedisStreamGroupConsumer<>( + groupName.get(), + consumerName.get(), + converter, + new String[] {streamKey}, + new Long[] {timestamp}, + config); + case SPECIFIC_OFFSETS: + return new RedisStreamGroupConsumer<>( + groupName.get(), + consumerName.get(), + converter, + new String[] {streamKey}, + new StreamEntryID[] {streamEntryId}, + config); + default: + return new RedisStreamGroupConsumer<>( + groupName.get(), + consumerName.get(), + startupMode, + converter, + new String[] {streamKey}, + config); + } + } else { + switch (startupMode) { + case TIMESTAMP: + return new RedisStreamConsumer<>( + converter, new String[] {streamKey}, new Long[] {timestamp}, config); + case SPECIFIC_OFFSETS: + return new RedisStreamConsumer<>( + converter, + new String[] {streamKey}, + new StreamEntryID[] {streamEntryId}, + config); + default: + return new RedisStreamConsumer<>(config, startupMode, converter, streamKey); + } + } + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java b/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java new file mode 100644 index 0000000..95966c8 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.util; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +/** */ +public final class ConsumerGroupUtils { + + public static void createConsumerGroup( + Jedis jedis, + String streamKey, + String groupName, + StreamEntryID streamEntryID, + boolean createStreamIfAbsent) { + jedis.xgroupCreate(streamKey, groupName, streamEntryID, createStreamIfAbsent); + } + // + // public static void createConsumerGroup(Jedis jedis, String streamKey, String groupName, + // long timestamp, boolean createStreamIfAbsent) { + // createConsumerGroup(jedis, streamKey, groupName, new StreamEntryID(timestamp, 0L), + // createStreamIfAbsent); + // } + + public static void createConsumerGroup( + Jedis jedis, + String streamKey, + String groupName, + StartupMode startupMode, + boolean createStreamIfAbsent) { + final StreamEntryID streamEntryID; + switch (startupMode) { + case EARLIEST: + streamEntryID = new StreamEntryID(); + break; + case LATEST: + streamEntryID = StreamEntryID.LAST_ENTRY; + break; + case SPECIFIC_OFFSETS: + throw new RuntimeException("Use the method with StreamEntryID as param"); + case TIMESTAMP: + throw new RuntimeException("Use the method with 'long timestamp' param"); + default: + throw new RuntimeException( + "Consumer Group cannot be initialized from " + startupMode); + } + createConsumerGroup(jedis, streamKey, groupName, streamEntryID, createStreamIfAbsent); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java b/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java new file mode 100644 index 0000000..74c2dee --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.util; + +import org.apache.flink.streaming.connectors.redis.config.RedisConfigConstants; + +import redis.clients.jedis.Jedis; + +import java.util.Properties; + +/** */ +public class JedisUtils { + + public static Jedis createResource(Properties configProps) { + + String host = + configProps.getProperty( + RedisConfigConstants.REDIS_HOST, RedisConfigConstants.DEFAULT_REDIS_HOST); + + int port = + Integer.parseInt( + configProps.getProperty( + RedisConfigConstants.REDIS_PORT, + Integer.toString(RedisConfigConstants.DEFAULT_REDIS_PORT))); + + return new Jedis(host, port); + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java new file mode 100644 index 0000000..730eda9 --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.RedisConfigConstants; +import org.apache.flink.streaming.connectors.redis.util.JedisUtils; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.After; +import org.junit.Before; +import org.testcontainers.containers.GenericContainer; +import redis.clients.jedis.Jedis; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** */ +public abstract class RedisITCaseBase extends AbstractTestBase { + + public static final String REDIS_IMAGE = "redis"; + private static final int REDIS_PORT = 6379; + + private static final AtomicBoolean running = new AtomicBoolean(false); + private static final GenericContainer container = + new GenericContainer<>(REDIS_IMAGE).withExposedPorts(REDIS_PORT); + + protected Jedis jedis; + + protected static synchronized void start() { + if (!running.get()) { + container.start(); + running.set(true); + } + } + + protected static void stop() { + container.stop(); + running.set(false); + } + + @Before + public void setUp() { + jedis = JedisUtils.createResource(getConfigProperties()); + jedis.flushAll(); + } + + @After + public void tearDown() { + if (jedis != null) { + jedis.close(); + } + } + + protected Properties getConfigProperties() { + start(); + + Properties configProps = new Properties(); + configProps.setProperty(RedisConfigConstants.REDIS_HOST, container.getContainerIpAddress()); + configProps.setProperty( + RedisConfigConstants.REDIS_PORT, Integer.toString(container.getFirstMappedPort())); + return configProps; + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java new file mode 100644 index 0000000..c9ac95c --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** */ +public class RedisStreamConsumerITCase extends RedisITCaseBase { + + private static final int NUM_ELEMENTS = 4; + private static final int NUM_FIELDS = 3; + private static final String REDIS_KEY = "TEST_KEY"; + + private StreamExecutionEnvironment env; + + private static final AtomicInteger count = new AtomicInteger(); + + @BeforeClass + public static void prepare() { + start(); + } + + @AfterClass + public static void cleanUp() { + stop(); + } + + @Before + @Override + public void setUp() { + super.setUp(); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + count.set(0); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + } + + @Test + public void redisConsumer() throws Exception { + populate(jedis); + + DataStreamSource source = + env.addSource( + new RedisStreamConsumer<>( + getConfigProperties(), + StartupMode.EARLIEST, + new SchemalessDataRowToMap(), + REDIS_KEY)); + source.returns( + Types.ROW( + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING)); + source.setParallelism(1); + + source.addSink(new TestRowSinkFunction()); + + env.execute("Test Redis Row Consumer"); + + assertEquals(NUM_ELEMENTS, count.get()); + } + + private static void populate(Jedis iJedis) { + for (int i = 0; i < NUM_ELEMENTS; i++) { + HashMap map = new HashMap<>(); + for (int j = 0; j < NUM_FIELDS; j++) { + map.put("f" + j, "" + j); + } + iJedis.xadd(REDIS_KEY, StreamEntryID.NEW_ENTRY, map); + } + } + + private static class TestRowSinkFunction implements SinkFunction { + + private static final long serialVersionUID = 1L; + + TestRowSinkFunction() {} + + @Override + public void invoke(Row value, Context context) throws Exception { + count.incrementAndGet(); + } + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java new file mode 100644 index 0000000..665eebd --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap; +import org.apache.flink.streaming.connectors.redis.util.ConsumerGroupUtils; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** */ +public class RedisStreamGroupConsumerITCase extends RedisITCaseBase { + + private static final Logger LOGGER = + LoggerFactory.getLogger(RedisStreamGroupConsumerITCase.class); + + private static final int NUM_THREADS = 3; + private static final int NUM_ELEMENTS = 20; + private static final int NUM_FIELDS = 3; + + private static final String REDIS_KEY = "TEST_KEY"; + private static final String GROUP_NAME = "FlinkGroup"; + + StreamExecutionEnvironment env; + private static final AtomicInteger[] count = new AtomicInteger[NUM_THREADS]; + + @BeforeClass + public static void prepare() { + start(); + } + + @AfterClass + public static void cleanUp() { + stop(); + } + + @Before + @Override + public void setUp() { + super.setUp(); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + for (int t = 0; t < NUM_THREADS; t++) { + count[t] = new AtomicInteger(); + } + } + + @After + @Override + public void tearDown() { + super.tearDown(); + } + + @Test + public void redisGroupConsumer() throws Exception { + ConsumerGroupUtils.createConsumerGroup( + jedis, REDIS_KEY, GROUP_NAME, StartupMode.EARLIEST, true); + populate(jedis); + + for (int t = 0; t < NUM_THREADS; t++) { + DataStreamSource source = + env.addSource( + new RedisStreamGroupConsumer<>( + GROUP_NAME, + "consumer" + t, + new SchemalessDataRowToMap(), + REDIS_KEY, + getConfigProperties())); + source.returns( + Types.ROW( + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING)); + source.setParallelism(1); + + source.addSink(new TestRowSinkFunction(t)); + } + + env.execute("Test Redis Consumer Group"); + + int total = 0; + for (int t = 0; t < NUM_THREADS; t++) { + if (count[t].get() == 0) { + LOGGER.warn("consumer{} could not read any entry.", t); + } + total += count[t].get(); + } + assertEquals(NUM_ELEMENTS, total); + } + + private static void populate(Jedis iJedis) { + for (int i = 0; i < NUM_ELEMENTS; i++) { + HashMap map = new HashMap<>(); + for (int j = 0; j < NUM_FIELDS; j++) { + map.put("f" + j, "" + j); + } + iJedis.xadd(REDIS_KEY, StreamEntryID.NEW_ENTRY, map); + } + } + + private static class TestRowSinkFunction implements SinkFunction { + + private static final long serialVersionUID = 1L; + + private final int countIndex; + + TestRowSinkFunction(final int countIndex) { + this.countIndex = countIndex; + } + + @Override + public void invoke(Row value, Context context) throws Exception { + count[countIndex].incrementAndGet(); + } + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java new file mode 100644 index 0000000..9e2801a --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; + +/** */ +public class RedisStreamProducerITCase extends RedisITCaseBase { + + private static final int NUM_ELEMENTS = 4; + private static final int NUM_FIELDS = 3; + private static final String REDIS_KEY = "TEST_KEY"; + + private StreamExecutionEnvironment env; + + @BeforeClass + public static void prepare() { + start(); + } + + @AfterClass + public static void cleanUp() { + stop(); + } + + @Before + @Override + public void setUp() { + super.setUp(); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + } + + @Test + public void redisProducer() throws Exception { + DataStreamSource source = env.addSource(new TestRowSourceFunction()); + + SinkFunction producer = + new FlinkRedisStreamProducer<>( + new SchemalessDataRowToMap(), REDIS_KEY, getConfigProperties()); + source.addSink(producer); + + env.execute("Test Redis Stream Producer"); + + assertEquals(NUM_ELEMENTS, jedis.xlen(REDIS_KEY).intValue()); + } + + private static class TestRowSourceFunction implements SourceFunction { + + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ArrayList list = new ArrayList<>(NUM_FIELDS); + for (int j = 0; j < NUM_FIELDS; j++) { + list.add("f" + j); + list.add("" + j); + } + ctx.collect(Row.of(list.toArray())); + } + } + + @Override + public void cancel() { + running = false; + } + } +} From 59dafb46ccf24f704d38e7c3e6894b435c9f6409 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Thu, 19 May 2022 16:53:22 +0600 Subject: [PATCH 2/2] Change xread/xreadGroup calls --- .../redis/AbstractRedisStreamConsumer.java | 74 +++++++------------ .../connectors/redis/RedisStreamConsumer.java | 27 ++++--- .../redis/RedisStreamGroupConsumer.java | 35 ++++----- .../redis/table/RedisStreamDynamicSource.java | 23 +++--- 4 files changed, 71 insertions(+), 88 deletions(-) diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java index caf4776..3b5240e 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java @@ -18,28 +18,27 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.streaming.connectors.redis.config.StartupMode; - import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; -import java.util.AbstractMap.SimpleEntry; -import java.util.Arrays; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.stream.Collectors; -/** @param */ +/** + * @param + */ public abstract class AbstractRedisStreamConsumer extends RedisConsumerBase { - protected final Entry[] streamEntryIds; - private final Map keyIndex = new HashMap<>(); + protected final Map streamEntryIds; public AbstractRedisStreamConsumer( - StartupMode startupMode, String[] streamKeys, Properties configProps) { - super(Arrays.asList(streamKeys), configProps); + StartupMode startupMode, List streamKeys, Properties configProps) { + super(streamKeys, configProps); final StreamEntryID streamEntryID; switch (startupMode) { case EARLIEST: @@ -60,24 +59,17 @@ public AbstractRedisStreamConsumer( throw new IllegalStateException(); } this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID); - initializeKeyIndex(); - } - - public AbstractRedisStreamConsumer( - String[] streamKeys, Long[] timestamps, Properties configProps) { - this(streamKeys, streamEntryIds(timestamps), configProps); } public AbstractRedisStreamConsumer( - String[] streamKeys, StreamEntryID[] streamIds, Properties configProps) { + List streamKeys, List streamIds, Properties configProps) { this(prepareStreamEntryIds(streamKeys, streamIds), configProps); } private AbstractRedisStreamConsumer( - Entry[] streamIds, Properties configProps) { + Map streamIds, Properties configProps) { super(null, configProps); this.streamEntryIds = streamIds; - initializeKeyIndex(); } @Override @@ -104,44 +96,32 @@ protected abstract void collect( SourceContext sourceContext, String streamKey, StreamEntry streamEntry); protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) { - int index = keyIndex.get(streamKey); - if (this.streamEntryIds[index].getValue().toString().equals(">")) { + if (this.streamEntryIds.get(streamKey).toString().equals(">")) { // skip } else { - this.streamEntryIds[index].setValue(streamEntryID); + this.streamEntryIds.put(streamKey, streamEntryID); } } - private void initializeKeyIndex() { - int index = 0; - for (Entry streamEntryId : streamEntryIds) { - keyIndex.put(streamEntryId.getKey(), index++); - } + private static Map prepareStreamEntryIds( + List streamKeys, StreamEntryID streamId) { + Map streams = new LinkedHashMap<>(streamKeys.size()); + streamKeys.forEach(streamKey -> streams.put(streamKey, streamId)); + return streams; } - private static Entry[] prepareStreamEntryIds( - String[] streamKeys, StreamEntryID streamId) { - Entry[] streams = new Entry[streamKeys.length]; - for (int i = 0; i < streamKeys.length; i++) { - streams[i] = new SimpleEntry<>(streamKeys[i], streamId); + private static Map prepareStreamEntryIds( + List streamKeys, List streamIds) { + Map streams = new LinkedHashMap<>(streamKeys.size()); + for (int i = 0; i < streamKeys.size(); i++) { + streams.put(streamKeys.get(i), streamIds.get(i)); } - return (Entry[]) streams; + return streams; } - private static Entry[] prepareStreamEntryIds( - String[] streamKeys, StreamEntryID[] streamIds) { - Entry[] streams = new Entry[streamKeys.length]; - for (int i = 0; i < streamKeys.length; i++) { - streams[i] = new SimpleEntry<>(streamKeys[i], streamIds[i]); - } - return (Entry[]) streams; - } - - private static StreamEntryID[] streamEntryIds(Long[] timestamps) { - StreamEntryID[] entryIds = new StreamEntryID[timestamps.length]; - for (int i = 0; i < timestamps.length; i++) { - entryIds[i] = new StreamEntryID(timestamps[i], 0L); - } - return entryIds; + public static List convertToStreamEntryIDs(List timestamps) { + return timestamps.stream() + .map(ts -> new StreamEntryID(ts, 0L)) + .collect(Collectors.toList()); } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java index 5e6a7ea..f1ae6ab 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java @@ -18,16 +18,19 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.streaming.connectors.redis.config.StartupMode; - import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XReadParams; +import java.util.Arrays; import java.util.List; import java.util.Map.Entry; import java.util.Properties; -/** @param */ +/** + * @param + */ public class RedisStreamConsumer extends AbstractRedisStreamConsumer { private final DataConverter dataConverter; @@ -36,24 +39,23 @@ public RedisStreamConsumer( Properties configProps, StartupMode startupMode, DataConverter dataConverter, - String... streamKeys) { - super(startupMode, streamKeys, configProps); - this.dataConverter = dataConverter; + String streamKey) { + this(configProps, startupMode, dataConverter, Arrays.asList(streamKey)); } public RedisStreamConsumer( + Properties configProps, + StartupMode startupMode, DataConverter dataConverter, - String[] streamKeys, - Long[] timestamps, - Properties configProps) { - super(streamKeys, timestamps, configProps); + List streamKeys) { + super(startupMode, streamKeys, configProps); this.dataConverter = dataConverter; } public RedisStreamConsumer( DataConverter dataConverter, - String[] streamKeys, - StreamEntryID[] streamIds, + List streamKeys, + List streamIds, Properties configProps) { super(streamKeys, streamIds, configProps); this.dataConverter = dataConverter; @@ -61,7 +63,8 @@ public RedisStreamConsumer( @Override protected List>> read(Jedis jedis) { - return jedis.xread(1, 0L, streamEntryIds); + // return jedis.xread(XReadParams.xReadParams().count(1).block(0), streamEntryIds); + return jedis.xread(XReadParams.xReadParams().count(1), streamEntryIds); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java index bfaa1a3..8f35b77 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java @@ -18,16 +18,19 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.streaming.connectors.redis.config.StartupMode; - import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XReadGroupParams; +import java.util.Arrays; import java.util.List; import java.util.Map.Entry; import java.util.Properties; -/** @param */ +/** + * @param + */ public class RedisStreamGroupConsumer extends AbstractRedisStreamConsumer { private final String group; @@ -46,7 +49,7 @@ public RedisStreamGroupConsumer( consumerName, StartupMode.GROUP_OFFSETS, dataConverter, - new String[] {streamKey}, + Arrays.asList(streamKey), config); } @@ -55,7 +58,7 @@ public RedisStreamGroupConsumer( String consumerName, StartupMode startupMode, DataConverter dataConverter, - String[] streamKeys, + List streamKeys, Properties config) { super(startupMode, streamKeys, config); this.group = groupName; @@ -67,21 +70,8 @@ public RedisStreamGroupConsumer( String groupName, String consumerName, DataConverter dataConverter, - String[] streamKeys, - Long[] timestamps, - Properties config) { - super(streamKeys, timestamps, config); - this.group = groupName; - this.consumer = consumerName; - this.dataConverter = dataConverter; - } - - public RedisStreamGroupConsumer( - String groupName, - String consumerName, - DataConverter dataConverter, - String[] streamKeys, - StreamEntryID[] streamIds, + List streamKeys, + List streamIds, Properties config) { super(streamKeys, streamIds, config); this.group = groupName; @@ -91,7 +81,12 @@ public RedisStreamGroupConsumer( @Override protected List>> read(Jedis jedis) { - return jedis.xreadGroup(group, consumer, 1, 0L, true, streamEntryIds); + return jedis.xreadGroup( + group, + consumer, + // XReadGroupParams.xReadGroupParams().count(1).block(0).noAck(), + XReadGroupParams.xReadGroupParams().count(1).noAck(), + streamEntryIds); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java index 6676d82..03f9237 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java @@ -28,9 +28,9 @@ import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; - import redis.clients.jedis.StreamEntryID; +import java.util.Arrays; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -291,16 +291,17 @@ protected AbstractRedisStreamConsumer createRedisConsumer() { groupName.get(), consumerName.get(), converter, - new String[] {streamKey}, - new Long[] {timestamp}, + Arrays.asList(streamKey), + RedisStreamGroupConsumer.convertToStreamEntryIDs( + Arrays.asList(timestamp)), config); case SPECIFIC_OFFSETS: return new RedisStreamGroupConsumer<>( groupName.get(), consumerName.get(), converter, - new String[] {streamKey}, - new StreamEntryID[] {streamEntryId}, + Arrays.asList(streamKey), + Arrays.asList(streamEntryId), config); default: return new RedisStreamGroupConsumer<>( @@ -308,19 +309,23 @@ protected AbstractRedisStreamConsumer createRedisConsumer() { consumerName.get(), startupMode, converter, - new String[] {streamKey}, + Arrays.asList(streamKey), config); } } else { switch (startupMode) { case TIMESTAMP: return new RedisStreamConsumer<>( - converter, new String[] {streamKey}, new Long[] {timestamp}, config); + converter, + Arrays.asList(streamKey), + AbstractRedisStreamConsumer.convertToStreamEntryIDs( + Arrays.asList(timestamp)), + config); case SPECIFIC_OFFSETS: return new RedisStreamConsumer<>( converter, - new String[] {streamKey}, - new StreamEntryID[] {streamEntryId}, + Arrays.asList(streamKey), + Arrays.asList(streamEntryId), config); default: return new RedisStreamConsumer<>(config, startupMode, converter, streamKey);