diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ec3f466 --- /dev/null +++ b/pom.xml @@ -0,0 +1,138 @@ + + + + 4.0.0 + + org.apache.flink + flink-connector-redis + 1.16-SNAPSHOT + jar + Flink : Connectors : Redis + + + + The Apache Software License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + UTF-8 + + 1.16-SNAPSHOT + 15.0 + 1.8 + 1.7.32 + ${target.java.version} + ${target.java.version} + + 2.12.7 + 2.12 + 3.8.0 + 1.7.32 + 1.16.2 + 4.13.2 + 1.3 + + + + + + + + redis.clients + jedis + ${jedis.version} + compile + + + org.slf4j + slf4j.api + + + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + + junit + junit + ${junit.version} + jar + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + %regex[.*ITCase.*] + + + + + + + \ No newline at end of file diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java new file mode 100644 index 0000000..3b5240e --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * @param + */ +public abstract class AbstractRedisStreamConsumer extends RedisConsumerBase { + + protected final Map streamEntryIds; + + public AbstractRedisStreamConsumer( + StartupMode startupMode, List streamKeys, Properties configProps) { + super(streamKeys, configProps); + final StreamEntryID streamEntryID; + switch (startupMode) { + case EARLIEST: + streamEntryID = new StreamEntryID(); + break; + case LATEST: + streamEntryID = StreamEntryID.LAST_ENTRY; + break; + case GROUP_OFFSETS: + streamEntryID = StreamEntryID.UNRECEIVED_ENTRY; + break; + case SPECIFIC_OFFSETS: + throw new RuntimeException( + "Use the constructor with 'StreamEntryID[] streamIds' as param"); + case TIMESTAMP: + throw new RuntimeException("Use the constructor with 'Long[] timestamps' param"); + default: + throw new IllegalStateException(); + } + this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID); + } + + public AbstractRedisStreamConsumer( + List streamKeys, List streamIds, Properties configProps) { + this(prepareStreamEntryIds(streamKeys, streamIds), configProps); + } + + private AbstractRedisStreamConsumer( + Map streamIds, Properties configProps) { + super(null, configProps); + this.streamEntryIds = streamIds; + } + + @Override + protected final boolean readAndCollect( + Jedis jedis, List streamKeys, SourceContext sourceContext) { + boolean anyEntry = false; + List>> response = read(jedis); + if (response != null) { + for (Entry> streamEntries : response) { + String streamKey = streamEntries.getKey(); + for (StreamEntry entry : streamEntries.getValue()) { + anyEntry = true; + collect(sourceContext, streamKey, entry); + updateIdForKey(streamKey, entry.getID()); + } + } + } + return anyEntry; + } + + protected abstract List>> read(Jedis jedis); + + protected abstract void collect( + SourceContext sourceContext, String streamKey, StreamEntry streamEntry); + + protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) { + if (this.streamEntryIds.get(streamKey).toString().equals(">")) { + // skip + } else { + this.streamEntryIds.put(streamKey, streamEntryID); + } + } + + private static Map prepareStreamEntryIds( + List streamKeys, StreamEntryID streamId) { + Map streams = new LinkedHashMap<>(streamKeys.size()); + streamKeys.forEach(streamKey -> streams.put(streamKey, streamId)); + return streams; + } + + private static Map prepareStreamEntryIds( + List streamKeys, List streamIds) { + Map streams = new LinkedHashMap<>(streamKeys.size()); + for (int i = 0; i < streamKeys.size(); i++) { + streams.put(streamKeys.get(i), streamIds.get(i)); + } + return streams; + } + + public static List convertToStreamEntryIDs(List timestamps) { + return timestamps.stream() + .map(ts -> new StreamEntryID(ts, 0L)) + .collect(Collectors.toList()); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java b/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java new file mode 100644 index 0000000..106f1a3 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/DataConverter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import java.io.Serializable; +import java.util.Map; + +/** @param */ +public interface DataConverter extends Serializable { + + OUT toData(Map input); +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java new file mode 100644 index 0000000..0fa3ff2 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisProducerBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.util.JedisUtils; + +import redis.clients.jedis.Jedis; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** @param */ +public abstract class FlinkRedisProducerBase extends RichSinkFunction + implements CheckpointedFunction { + + private final Properties configProps; + + private transient Jedis jedis; + + private final String key; + + public FlinkRedisProducerBase(String key, Properties configProps) { + checkNotNull(key, "key can not be null"); + this.key = key; + + checkNotNull(configProps, "configProps can not be null"); + this.configProps = configProps; + } + + /** Initializes the connection to Redis. */ + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + // code + this.jedis = JedisUtils.createResource(this.configProps); + } + + @Override + public void close() throws Exception { + super.close(); + // code + if (jedis != null) { + jedis.close(); + } + } + + @Override + public void invoke(OUT value, Context context) throws Exception { + invoke(this.jedis, this.key, value, context); + } + + protected abstract void invoke(Jedis resource, String key, OUT value, Context context) + throws Exception; + + @Override + public void snapshotState(FunctionSnapshotContext context) { + // in synchronous mode, nothing to do + } + + @Override + public void initializeState(FunctionInitializationContext context) { + // nothing to do + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java new file mode 100644 index 0000000..72c654c --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/FlinkRedisStreamProducer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +import java.util.Map; +import java.util.Properties; + +/** @param */ +public class FlinkRedisStreamProducer extends FlinkRedisProducerBase { + + private final MapConverter mapConverter; + + public FlinkRedisStreamProducer( + MapConverter mapConverter, String streamKey, Properties configProps) { + super(streamKey, configProps); + this.mapConverter = mapConverter; + } + + @Override + public void invoke(Jedis jedis, String streamKey, OUT value, Context context) throws Exception { + // code + Map map = mapConverter.toMap(value); + jedis.xadd(streamKey, StreamEntryID.NEW_ENTRY, map); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java b/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java new file mode 100644 index 0000000..7b141a5 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/MapConverter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import java.io.Serializable; +import java.util.Map; + +/** @param */ +public interface MapConverter extends Serializable { + + Map toMap(IN input); +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java new file mode 100644 index 0000000..ab87a7b --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisConsumerBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.connectors.redis.util.JedisUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** @param */ +public abstract class RedisConsumerBase extends RichParallelSourceFunction { + + protected static final Logger LOG = LoggerFactory.getLogger(RedisConsumerBase.class); + + private final Properties configProps; + + private transient Jedis jedis; + + private final List keys; + + /** Flag indicating whether the consumer is still running. */ + private volatile boolean running = true; + + public RedisConsumerBase(List keys, Properties configProps) { + checkNotNull(keys, "keys can not be null"); + checkArgument(keys.size() != 0, "must be consuming at least 1 key"); + this.keys = Collections.unmodifiableList(keys); + + this.configProps = checkNotNull(configProps, "configProps can not be null"); + + if (LOG.isInfoEnabled()) { + StringBuilder sb = new StringBuilder(); + for (String key : keys) { + sb.append(key).append(", "); + } + LOG.info( + "Flink Redis Stream Consumer is going to read the following streams: {}", + sb.toString()); + } + } + + @Override + public void open(Configuration configuration) throws Exception { + // code + super.open(configuration); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + if (!running) { + return; + } + this.jedis = JedisUtils.createResource(this.configProps); + + while (running) { + running = readAndCollect(this.jedis, this.keys, sourceContext); + } + } + + protected abstract boolean readAndCollect( + Jedis jedis, List keys, SourceContext sourceContext); + + @Override + public void cancel() { + // set ourselves as not running; + // this would let the main discovery loop escape as soon as possible + running = false; + + // abort the fetcher, if there is one + if (jedis != null) { + jedis.close(); + } + } + + @Override + public void close() throws Exception { + cancel(); + + super.close(); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java new file mode 100644 index 0000000..f1ae6ab --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XReadParams; + +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * @param + */ +public class RedisStreamConsumer extends AbstractRedisStreamConsumer { + + private final DataConverter dataConverter; + + public RedisStreamConsumer( + Properties configProps, + StartupMode startupMode, + DataConverter dataConverter, + String streamKey) { + this(configProps, startupMode, dataConverter, Arrays.asList(streamKey)); + } + + public RedisStreamConsumer( + Properties configProps, + StartupMode startupMode, + DataConverter dataConverter, + List streamKeys) { + super(startupMode, streamKeys, configProps); + this.dataConverter = dataConverter; + } + + public RedisStreamConsumer( + DataConverter dataConverter, + List streamKeys, + List streamIds, + Properties configProps) { + super(streamKeys, streamIds, configProps); + this.dataConverter = dataConverter; + } + + @Override + protected List>> read(Jedis jedis) { + // return jedis.xread(XReadParams.xReadParams().count(1).block(0), streamEntryIds); + return jedis.xread(XReadParams.xReadParams().count(1), streamEntryIds); + } + + @Override + protected void collect( + SourceContext sourceContext, String streamKey, StreamEntry streamEntry) { + sourceContext.collect(dataConverter.toData(streamEntry.getFields())); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java new file mode 100644 index 0000000..8f35b77 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntry; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XReadGroupParams; + +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +/** + * @param + */ +public class RedisStreamGroupConsumer extends AbstractRedisStreamConsumer { + + private final String group; + private final String consumer; + + private final DataConverter dataConverter; + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + DataConverter dataConverter, + String streamKey, + Properties config) { + this( + groupName, + consumerName, + StartupMode.GROUP_OFFSETS, + dataConverter, + Arrays.asList(streamKey), + config); + } + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + StartupMode startupMode, + DataConverter dataConverter, + List streamKeys, + Properties config) { + super(startupMode, streamKeys, config); + this.group = groupName; + this.consumer = consumerName; + this.dataConverter = dataConverter; + } + + public RedisStreamGroupConsumer( + String groupName, + String consumerName, + DataConverter dataConverter, + List streamKeys, + List streamIds, + Properties config) { + super(streamKeys, streamIds, config); + this.group = groupName; + this.consumer = consumerName; + this.dataConverter = dataConverter; + } + + @Override + protected List>> read(Jedis jedis) { + return jedis.xreadGroup( + group, + consumer, + // XReadGroupParams.xReadGroupParams().count(1).block(0).noAck(), + XReadGroupParams.xReadGroupParams().count(1).noAck(), + streamEntryIds); + } + + @Override + protected void collect( + SourceContext sourceContext, String streamKey, StreamEntry streamEntry) { + sourceContext.collect(dataConverter.toData(streamEntry.getFields())); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java new file mode 100644 index 0000000..3b615e6 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/RedisConfigConstants.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.config; + +import redis.clients.jedis.Protocol; + +/** */ +public class RedisConfigConstants { + + public static final String REDIS_HOST = "redis.host"; + + public static final String REDIS_PORT = "redis.port"; + + public static final String DEFAULT_REDIS_HOST = Protocol.DEFAULT_HOST; + + public static final int DEFAULT_REDIS_PORT = Protocol.DEFAULT_PORT; +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java b/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java new file mode 100644 index 0000000..785a3bf --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/config/StartupMode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.config; + +/** + * Startup modes for the Redis Consumer. Based on + * flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java + */ +public enum StartupMode { + + /** Start from the earliest offset possible. */ + EARLIEST, + + /** Start from user-supplied timestamp for each partition. */ + TIMESTAMP, + + /** Start from user-supplied specific offsets for each partition. */ + SPECIFIC_OFFSETS, + + /** Start from the latest offset. */ + LATEST, + + /** Start from latest group offset. */ + GROUP_OFFSETS +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java new file mode 100644 index 0000000..e646fa4 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundDataRowToMap.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.internal; + +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.types.Row; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** */ +public class FieldBoundDataRowToMap implements MapConverter, DataConverter { + + private final String[] fieldNames; + + public FieldBoundDataRowToMap(String[] fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public Map toMap(Row row) { + int arity = row.getArity(); + if (arity != fieldNames.length) { + throw new RuntimeException(); + } + Map map = new LinkedHashMap<>(); + for (int i = 0; i < arity; i++) { + String key = fieldNames[i]; + String value = (String) row.getField(i); + map.put(key, value); + } + return map; + } + + @Override + public Row toData(Map map) { + int size = map.size(); + if (size != fieldNames.length) { + throw new RuntimeException(); + } + Object[] array = new Object[size]; + for (int i = 0; i < size; i++) { + array[i] = map.get(fieldNames[i]); + } + return Row.of(array); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java new file mode 100644 index 0000000..5c23eb6 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/FieldBoundRowDataToMap.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.internal; + +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** */ +public class FieldBoundRowDataToMap implements MapConverter, DataConverter { + + private final String[] fieldNames; + + public FieldBoundRowDataToMap(String[] fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public Map toMap(RowData row) { + int arity = row.getArity(); + if (arity != fieldNames.length) { + throw new RuntimeException(); + } + Map map = new LinkedHashMap<>(); + for (int i = 0; i < arity; i++) { + String key = fieldNames[i]; + String value = row.getString(i).toString(); + map.put(key, value); + } + return map; + } + + @Override + public RowData toData(Map map) { + int size = map.size(); + if (size != fieldNames.length) { + throw new RuntimeException(); + } + Object[] array = new Object[size]; + for (int i = 0; i < size; i++) { + array[i] = map.get(fieldNames[i]); + } + return GenericRowData.of(array); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java new file mode 100644 index 0000000..164c4e4 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/internal/SchemalessDataRowToMap.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.internal; + +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.types.Row; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** */ +public class SchemalessDataRowToMap implements MapConverter, DataConverter { + + public SchemalessDataRowToMap() {} + + @Override + public Map toMap(Row input) { + int arity = input.getArity(); + if ((arity & 1) != 0) { + throw new RuntimeException(); + } + Map map = new LinkedHashMap<>(); + for (int i = 0; i < arity; i += 2) { + String key = (String) input.getField(i); + String value = (String) input.getField(i + 1); + map.put(key, value); + } + return map; + } + + @Override + public Row toData(Map map) { + int size = map.size(); + Object[] array = new Object[size << 1]; + int index = 0; + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + array[index++] = key; + array[index++] = value; + } + return Row.of(array); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java new file mode 100644 index 0000000..3123ba3 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSink.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.redis.FlinkRedisStreamProducer; +import org.apache.flink.streaming.connectors.redis.MapConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +/** Redis-Stream-backed {@link DynamicTableSink}. */ +@Internal +public class RedisStreamDynamicSink implements DynamicTableSink, SupportsPartitioning { + + private static final ChangelogMode CHANGELOG_MODE = ChangelogMode.insertOnly(); + + /** The Redis Stream to write to. */ + private final String streamKey; + + /** Properties for the Redis (Stream) producer. */ + private final Properties producerProperties; + + private final MapConverter converter; + + public RedisStreamDynamicSink( + String streamKey, Properties producerProperties, MapConverter converter) { + this.streamKey = + Preconditions.checkNotNull(streamKey, "Redis Stream key name must not be null"); + this.producerProperties = + Preconditions.checkNotNull( + producerProperties, + "Properties for the Flink Redis producer must not be null"); + this.converter = Preconditions.checkNotNull(converter, "Converter must not be null"); + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return CHANGELOG_MODE; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + FlinkRedisStreamProducer redisStreamProducer = + new FlinkRedisStreamProducer<>(converter, streamKey, producerProperties); + + return SinkFunctionProvider.of(redisStreamProducer); + } + + @Override + public DynamicTableSink copy() { + return new RedisStreamDynamicSink(streamKey, producerProperties, converter); + } + + @Override + public String asSummaryString() { + return "RedisStream"; + } + + // -------------------------------------------------------------------------------------------- + // SupportsPartitioning + // -------------------------------------------------------------------------------------------- + + @Override + public void applyStaticPartition(Map partition) { + // nothing to do + } + + // -------------------------------------------------------------------------------------------- + // Value semantics for equals and hashCode + // -------------------------------------------------------------------------------------------- + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RedisStreamDynamicSink that = (RedisStreamDynamicSink) o; + return Objects.equals(streamKey, that.streamKey) + && Objects.equals(producerProperties, that.producerProperties) + && Objects.equals(converter, that.converter); + } + + @Override + public int hashCode() { + return Objects.hash(streamKey, producerProperties, converter); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java new file mode 100644 index 0000000..03f9237 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.table; + +import org.apache.flink.streaming.connectors.redis.AbstractRedisStreamConsumer; +import org.apache.flink.streaming.connectors.redis.DataConverter; +import org.apache.flink.streaming.connectors.redis.RedisStreamConsumer; +import org.apache.flink.streaming.connectors.redis.RedisStreamGroupConsumer; +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.StreamEntryID; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +/** */ +public class RedisStreamDynamicSource implements ScanTableSource { + + private static final ChangelogMode CHANGELOG_MODE = ChangelogMode.insertOnly(); + + private final Properties config; + + private final Optional groupName; + + private final Optional consumerName; + + /** The Redis key to consume. */ + private final String streamKey; + + /** + * The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). + */ + private final StartupMode startupMode; + + /** + * Specific startup offset; only relevant when startup mode is {@link + * StartupMode#SPECIFIC_OFFSETS}. + */ + private StreamEntryID streamEntryId; + + /** + * Specific startup timestamp; only relevant when startup mode is {@link StartupMode#TIMESTAMP}. + */ + private Long timestamp; + + private final DataConverter converter; + + private RedisStreamDynamicSource( + Properties config, + Optional groupName, + Optional consumerName, + String streamKey, + StartupMode startupMode, + DataConverter rowMapConverter) { + this.config = config; + this.streamKey = Preconditions.checkNotNull(streamKey, "Key must not be null."); + this.groupName = groupName; + this.consumerName = consumerName; + this.converter = + Preconditions.checkNotNull(rowMapConverter, "RowMapConverter must not be null."); + this.startupMode = Preconditions.checkNotNull(startupMode, "StartupMode must not be null."); + } + + public RedisStreamDynamicSource( + Properties config, + String streamKey, + StreamEntryID streamEntryId, + DataConverter rowMapConverter) { + this( + config, + Optional.empty(), + Optional.empty(), + streamKey, + StartupMode.SPECIFIC_OFFSETS, + rowMapConverter); + this.streamEntryId = streamEntryId; + } + + public RedisStreamDynamicSource( + Properties config, + String streamKey, + Long timestamp, + DataConverter rowMapConverter) { + this( + config, + Optional.empty(), + Optional.empty(), + streamKey, + StartupMode.TIMESTAMP, + rowMapConverter); + this.timestamp = timestamp; + } + + public RedisStreamDynamicSource( + Properties config, + String streamKey, + StartupMode startupMode, + DataConverter rowMapConverter) { + this(config, Optional.empty(), Optional.empty(), streamKey, startupMode, rowMapConverter); + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + StreamEntryID streamEntryId, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + StartupMode.SPECIFIC_OFFSETS, + rowMapConverter); + this.streamEntryId = streamEntryId; + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + Long timestamp, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + StartupMode.TIMESTAMP, + rowMapConverter); + this.timestamp = timestamp; + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + StartupMode startupMode, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + startupMode, + rowMapConverter); + } + + public RedisStreamDynamicSource( + Properties config, + String groupName, + String consumerName, + String streamKey, + DataConverter rowMapConverter) { + this( + config, + Optional.of(groupName), + Optional.of(consumerName), + streamKey, + StartupMode.GROUP_OFFSETS, + rowMapConverter); + } + + @Override + public DynamicTableSource copy() { + if (groupName.isPresent()) { + switch (startupMode) { + case GROUP_OFFSETS: + return new RedisStreamDynamicSource( + config, groupName.get(), consumerName.get(), streamKey, converter); + case TIMESTAMP: + return new RedisStreamDynamicSource( + config, + groupName.get(), + consumerName.get(), + streamKey, + timestamp, + converter); + case SPECIFIC_OFFSETS: + return new RedisStreamDynamicSource( + config, + groupName.get(), + consumerName.get(), + streamKey, + streamEntryId, + converter); + default: + return new RedisStreamDynamicSource( + config, + groupName.get(), + consumerName.get(), + streamKey, + startupMode, + converter); + } + } else { + switch (startupMode) { + case TIMESTAMP: + return new RedisStreamDynamicSource(config, streamKey, timestamp, converter); + case SPECIFIC_OFFSETS: + return new RedisStreamDynamicSource( + config, streamKey, streamEntryId, converter); + default: + return new RedisStreamDynamicSource(config, streamKey, startupMode, converter); + } + } + } + + @Override + public String asSummaryString() { + return "RedisStream"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final RedisStreamDynamicSource that = (RedisStreamDynamicSource) o; + return Objects.equals(config, that.config) + && Objects.equals(groupName, that.groupName) + && Objects.equals(consumerName, that.consumerName) + && Objects.equals(streamKey, that.streamKey) + && startupMode == that.startupMode + && Objects.equals(streamEntryId, that.streamEntryId) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(converter, that.converter); + } + + @Override + public int hashCode() { + return Objects.hash( + config, + groupName, + consumerName, + streamKey, + startupMode, + streamEntryId, + timestamp, + converter); + } + + @Override + public ChangelogMode getChangelogMode() { + return CHANGELOG_MODE; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + AbstractRedisStreamConsumer redisStreamConsumer = createRedisConsumer(); + + return SourceFunctionProvider.of(redisStreamConsumer, false); + } + + protected AbstractRedisStreamConsumer createRedisConsumer() { + if (groupName.isPresent()) { + switch (startupMode) { + case GROUP_OFFSETS: + return new RedisStreamGroupConsumer<>( + groupName.get(), consumerName.get(), converter, streamKey, config); + case TIMESTAMP: + return new RedisStreamGroupConsumer<>( + groupName.get(), + consumerName.get(), + converter, + Arrays.asList(streamKey), + RedisStreamGroupConsumer.convertToStreamEntryIDs( + Arrays.asList(timestamp)), + config); + case SPECIFIC_OFFSETS: + return new RedisStreamGroupConsumer<>( + groupName.get(), + consumerName.get(), + converter, + Arrays.asList(streamKey), + Arrays.asList(streamEntryId), + config); + default: + return new RedisStreamGroupConsumer<>( + groupName.get(), + consumerName.get(), + startupMode, + converter, + Arrays.asList(streamKey), + config); + } + } else { + switch (startupMode) { + case TIMESTAMP: + return new RedisStreamConsumer<>( + converter, + Arrays.asList(streamKey), + AbstractRedisStreamConsumer.convertToStreamEntryIDs( + Arrays.asList(timestamp)), + config); + case SPECIFIC_OFFSETS: + return new RedisStreamConsumer<>( + converter, + Arrays.asList(streamKey), + Arrays.asList(streamEntryId), + config); + default: + return new RedisStreamConsumer<>(config, startupMode, converter, streamKey); + } + } + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java b/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java new file mode 100644 index 0000000..95966c8 --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/util/ConsumerGroupUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.util; + +import org.apache.flink.streaming.connectors.redis.config.StartupMode; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +/** */ +public final class ConsumerGroupUtils { + + public static void createConsumerGroup( + Jedis jedis, + String streamKey, + String groupName, + StreamEntryID streamEntryID, + boolean createStreamIfAbsent) { + jedis.xgroupCreate(streamKey, groupName, streamEntryID, createStreamIfAbsent); + } + // + // public static void createConsumerGroup(Jedis jedis, String streamKey, String groupName, + // long timestamp, boolean createStreamIfAbsent) { + // createConsumerGroup(jedis, streamKey, groupName, new StreamEntryID(timestamp, 0L), + // createStreamIfAbsent); + // } + + public static void createConsumerGroup( + Jedis jedis, + String streamKey, + String groupName, + StartupMode startupMode, + boolean createStreamIfAbsent) { + final StreamEntryID streamEntryID; + switch (startupMode) { + case EARLIEST: + streamEntryID = new StreamEntryID(); + break; + case LATEST: + streamEntryID = StreamEntryID.LAST_ENTRY; + break; + case SPECIFIC_OFFSETS: + throw new RuntimeException("Use the method with StreamEntryID as param"); + case TIMESTAMP: + throw new RuntimeException("Use the method with 'long timestamp' param"); + default: + throw new RuntimeException( + "Consumer Group cannot be initialized from " + startupMode); + } + createConsumerGroup(jedis, streamKey, groupName, streamEntryID, createStreamIfAbsent); + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java b/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java new file mode 100644 index 0000000..74c2dee --- /dev/null +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/util/JedisUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.util; + +import org.apache.flink.streaming.connectors.redis.config.RedisConfigConstants; + +import redis.clients.jedis.Jedis; + +import java.util.Properties; + +/** */ +public class JedisUtils { + + public static Jedis createResource(Properties configProps) { + + String host = + configProps.getProperty( + RedisConfigConstants.REDIS_HOST, RedisConfigConstants.DEFAULT_REDIS_HOST); + + int port = + Integer.parseInt( + configProps.getProperty( + RedisConfigConstants.REDIS_PORT, + Integer.toString(RedisConfigConstants.DEFAULT_REDIS_PORT))); + + return new Jedis(host, port); + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java new file mode 100644 index 0000000..730eda9 --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.config.RedisConfigConstants; +import org.apache.flink.streaming.connectors.redis.util.JedisUtils; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.After; +import org.junit.Before; +import org.testcontainers.containers.GenericContainer; +import redis.clients.jedis.Jedis; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +/** */ +public abstract class RedisITCaseBase extends AbstractTestBase { + + public static final String REDIS_IMAGE = "redis"; + private static final int REDIS_PORT = 6379; + + private static final AtomicBoolean running = new AtomicBoolean(false); + private static final GenericContainer container = + new GenericContainer<>(REDIS_IMAGE).withExposedPorts(REDIS_PORT); + + protected Jedis jedis; + + protected static synchronized void start() { + if (!running.get()) { + container.start(); + running.set(true); + } + } + + protected static void stop() { + container.stop(); + running.set(false); + } + + @Before + public void setUp() { + jedis = JedisUtils.createResource(getConfigProperties()); + jedis.flushAll(); + } + + @After + public void tearDown() { + if (jedis != null) { + jedis.close(); + } + } + + protected Properties getConfigProperties() { + start(); + + Properties configProps = new Properties(); + configProps.setProperty(RedisConfigConstants.REDIS_HOST, container.getContainerIpAddress()); + configProps.setProperty( + RedisConfigConstants.REDIS_PORT, Integer.toString(container.getFirstMappedPort())); + return configProps; + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java new file mode 100644 index 0000000..c9ac95c --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumerITCase.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** */ +public class RedisStreamConsumerITCase extends RedisITCaseBase { + + private static final int NUM_ELEMENTS = 4; + private static final int NUM_FIELDS = 3; + private static final String REDIS_KEY = "TEST_KEY"; + + private StreamExecutionEnvironment env; + + private static final AtomicInteger count = new AtomicInteger(); + + @BeforeClass + public static void prepare() { + start(); + } + + @AfterClass + public static void cleanUp() { + stop(); + } + + @Before + @Override + public void setUp() { + super.setUp(); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + count.set(0); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + } + + @Test + public void redisConsumer() throws Exception { + populate(jedis); + + DataStreamSource source = + env.addSource( + new RedisStreamConsumer<>( + getConfigProperties(), + StartupMode.EARLIEST, + new SchemalessDataRowToMap(), + REDIS_KEY)); + source.returns( + Types.ROW( + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING)); + source.setParallelism(1); + + source.addSink(new TestRowSinkFunction()); + + env.execute("Test Redis Row Consumer"); + + assertEquals(NUM_ELEMENTS, count.get()); + } + + private static void populate(Jedis iJedis) { + for (int i = 0; i < NUM_ELEMENTS; i++) { + HashMap map = new HashMap<>(); + for (int j = 0; j < NUM_FIELDS; j++) { + map.put("f" + j, "" + j); + } + iJedis.xadd(REDIS_KEY, StreamEntryID.NEW_ENTRY, map); + } + } + + private static class TestRowSinkFunction implements SinkFunction { + + private static final long serialVersionUID = 1L; + + TestRowSinkFunction() {} + + @Override + public void invoke(Row value, Context context) throws Exception { + count.incrementAndGet(); + } + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java new file mode 100644 index 0000000..665eebd --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumerITCase.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.redis.config.StartupMode; +import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap; +import org.apache.flink.streaming.connectors.redis.util.ConsumerGroupUtils; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +/** */ +public class RedisStreamGroupConsumerITCase extends RedisITCaseBase { + + private static final Logger LOGGER = + LoggerFactory.getLogger(RedisStreamGroupConsumerITCase.class); + + private static final int NUM_THREADS = 3; + private static final int NUM_ELEMENTS = 20; + private static final int NUM_FIELDS = 3; + + private static final String REDIS_KEY = "TEST_KEY"; + private static final String GROUP_NAME = "FlinkGroup"; + + StreamExecutionEnvironment env; + private static final AtomicInteger[] count = new AtomicInteger[NUM_THREADS]; + + @BeforeClass + public static void prepare() { + start(); + } + + @AfterClass + public static void cleanUp() { + stop(); + } + + @Before + @Override + public void setUp() { + super.setUp(); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + for (int t = 0; t < NUM_THREADS; t++) { + count[t] = new AtomicInteger(); + } + } + + @After + @Override + public void tearDown() { + super.tearDown(); + } + + @Test + public void redisGroupConsumer() throws Exception { + ConsumerGroupUtils.createConsumerGroup( + jedis, REDIS_KEY, GROUP_NAME, StartupMode.EARLIEST, true); + populate(jedis); + + for (int t = 0; t < NUM_THREADS; t++) { + DataStreamSource source = + env.addSource( + new RedisStreamGroupConsumer<>( + GROUP_NAME, + "consumer" + t, + new SchemalessDataRowToMap(), + REDIS_KEY, + getConfigProperties())); + source.returns( + Types.ROW( + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING, + Types.STRING)); + source.setParallelism(1); + + source.addSink(new TestRowSinkFunction(t)); + } + + env.execute("Test Redis Consumer Group"); + + int total = 0; + for (int t = 0; t < NUM_THREADS; t++) { + if (count[t].get() == 0) { + LOGGER.warn("consumer{} could not read any entry.", t); + } + total += count[t].get(); + } + assertEquals(NUM_ELEMENTS, total); + } + + private static void populate(Jedis iJedis) { + for (int i = 0; i < NUM_ELEMENTS; i++) { + HashMap map = new HashMap<>(); + for (int j = 0; j < NUM_FIELDS; j++) { + map.put("f" + j, "" + j); + } + iJedis.xadd(REDIS_KEY, StreamEntryID.NEW_ENTRY, map); + } + } + + private static class TestRowSinkFunction implements SinkFunction { + + private static final long serialVersionUID = 1L; + + private final int countIndex; + + TestRowSinkFunction(final int countIndex) { + this.countIndex = countIndex; + } + + @Override + public void invoke(Row value, Context context) throws Exception { + count[countIndex].incrementAndGet(); + } + } +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java new file mode 100644 index 0000000..9e2801a --- /dev/null +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/RedisStreamProducerITCase.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.internal.SchemalessDataRowToMap; +import org.apache.flink.types.Row; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; + +/** */ +public class RedisStreamProducerITCase extends RedisITCaseBase { + + private static final int NUM_ELEMENTS = 4; + private static final int NUM_FIELDS = 3; + private static final String REDIS_KEY = "TEST_KEY"; + + private StreamExecutionEnvironment env; + + @BeforeClass + public static void prepare() { + start(); + } + + @AfterClass + public static void cleanUp() { + stop(); + } + + @Before + @Override + public void setUp() { + super.setUp(); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + } + + @Test + public void redisProducer() throws Exception { + DataStreamSource source = env.addSource(new TestRowSourceFunction()); + + SinkFunction producer = + new FlinkRedisStreamProducer<>( + new SchemalessDataRowToMap(), REDIS_KEY, getConfigProperties()); + source.addSink(producer); + + env.execute("Test Redis Stream Producer"); + + assertEquals(NUM_ELEMENTS, jedis.xlen(REDIS_KEY).intValue()); + } + + private static class TestRowSourceFunction implements SourceFunction { + + private static final long serialVersionUID = 1L; + + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + + for (int i = 0; i < NUM_ELEMENTS && running; i++) { + ArrayList list = new ArrayList<>(NUM_FIELDS); + for (int j = 0; j < NUM_FIELDS; j++) { + list.add("f" + j); + list.add("" + j); + } + ctx.collect(Row.of(list.toArray())); + } + } + + @Override + public void cancel() { + running = false; + } + } +}