Skip to content

Commit 000c287

Browse files
committed
[FLINK-15571] Add stream command
1 parent 7c53e0f commit 000c287

File tree

12 files changed

+152
-153
lines changed

12 files changed

+152
-153
lines changed

src/main/java/org/apache/flink/connector/redis/shared/JedisConnector.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.connector.redis.shared;
1919

20-
import org.apache.flink.connector.redis.sink2.RedisAction;
2120
import redis.clients.jedis.JedisCluster;
2221
import redis.clients.jedis.JedisPool;
2322
import redis.clients.jedis.JedisSentinelPool;
@@ -50,16 +49,6 @@ public JedisCommands getJedisCommands() {
5049

5150
}
5251

53-
public void execute(RedisAction action) {
54-
switch (action.command) {
55-
case SET:
56-
getJedisCommands().set(action.key, action.value);
57-
break;
58-
default:
59-
throw new IllegalArgumentException("Cannot process such data type: " + action.command);
60-
}
61-
}
62-
6352
@Override
6453
public void close() {
6554
if (jedisCluster != null) jedisCluster.close();
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.apache.flink.connector.redis.shared.command;
2+
3+
import org.apache.flink.connector.redis.shared.JedisConnector;
4+
5+
import java.io.Serializable;
6+
7+
public interface RedisCommand extends Serializable {
8+
void send(JedisConnector connector);
9+
long getMessageSize();
10+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.flink.connector.redis.shared.command;
18+
19+
import org.apache.flink.connector.redis.shared.JedisConnector;
20+
import redis.clients.jedis.StreamEntryID;
21+
22+
import java.util.Map;
23+
24+
public class StreamRedisCommand implements RedisCommand {
25+
public final String key;
26+
public final Map<String, String> value;
27+
28+
private StreamRedisCommand(String key, Map<String, String> value) {
29+
this.key = key;
30+
this.value = value;
31+
}
32+
33+
public static Builder builder() {
34+
return new Builder();
35+
}
36+
37+
@Override
38+
public void send(JedisConnector connector) {
39+
connector.getJedisCommands().xadd(key, StreamEntryID.NEW_ENTRY, value);
40+
}
41+
42+
@Override
43+
public long getMessageSize() {
44+
return value.entrySet().stream()
45+
.map(k -> k.getKey().length()+k.getValue().length())
46+
.reduce(Integer::sum)
47+
.orElse(0);
48+
}
49+
50+
public static class Builder {
51+
private String key;
52+
private Map<String, String> value;
53+
54+
55+
public Builder withKey(String key) {
56+
this.key = key;
57+
return this;
58+
}
59+
60+
public Builder withValue(Map<String, String> value) {
61+
this.value = value;
62+
return this;
63+
}
64+
65+
public StreamRedisCommand build() {
66+
return new StreamRedisCommand(key, value);
67+
}
68+
}
69+
}

src/main/java/org/apache/flink/connector/redis/sink2/RedisAction.java renamed to src/main/java/org/apache/flink/connector/redis/shared/command/StringRedisCommand.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,15 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.flink.connector.redis.sink2;
17+
package org.apache.flink.connector.redis.shared.command;
1818

19-
import java.io.Serializable;
19+
import org.apache.flink.connector.redis.shared.JedisConnector;
2020

21-
public class RedisAction implements Serializable {
22-
public final RedisCommand command;
21+
public class StringRedisCommand implements RedisCommand {
2322
public final String key;
2423
public final String value;
2524

26-
private RedisAction(RedisCommand command, String key, String value) {
27-
this.command = command;
25+
private StringRedisCommand(String key, String value) {
2826
this.key = key;
2927
this.value = value;
3028
}
@@ -33,15 +31,20 @@ public static Builder builder() {
3331
return new Builder();
3432
}
3533

34+
@Override
35+
public void send(JedisConnector connector) {
36+
connector.getJedisCommands().set(key, value);
37+
}
38+
39+
@Override
40+
public long getMessageSize() {
41+
return value.length();
42+
}
43+
3644
public static class Builder {
37-
private RedisCommand command;
3845
private String key;
3946
private String value;
4047

41-
public Builder withCommand(RedisCommand command) {
42-
this.command = command;
43-
return this;
44-
}
4548

4649
public Builder withKey(String key) {
4750
this.key = key;
@@ -53,10 +56,8 @@ public Builder withValue(String value) {
5356
return this;
5457
}
5558

56-
public RedisAction build() {
57-
return new RedisAction(command, key, value);
59+
public StringRedisCommand build() {
60+
return new StringRedisCommand(key, value);
5861
}
5962
}
60-
61-
6263
}

src/main/java/org/apache/flink/connector/redis/shared/config/JedisSentinelConfig.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package org.apache.flink.connector.redis.shared.config;
1818

1919
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
20-
import org.slf4j.Logger;
21-
import org.slf4j.LoggerFactory;
2220
import redis.clients.jedis.Protocol;
2321

2422
import java.util.HashSet;

src/main/java/org/apache/flink/connector/redis/sink2/RedisCommand.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/main/java/org/apache/flink/connector/redis/sink2/RedisSerializer.java

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package org.apache.flink.connector.redis.sink2;
1818

1919
import org.apache.flink.api.common.functions.Function;
20+
import org.apache.flink.connector.redis.shared.command.RedisCommand;
2021

2122
import java.io.Serializable;
22-
import java.util.Optional;
2323

2424
/**
2525
* Function that creates the description how the input data should be mapped to redis type.
@@ -28,48 +28,5 @@
2828
*/
2929
public interface RedisSerializer<T> extends Function, Serializable {
3030

31-
/**
32-
* Returns a redis command.
33-
*
34-
* @return RedisCommand
35-
*/
36-
RedisCommand getCommand(T data);
37-
38-
/**
39-
* Extracts key from data.
40-
*
41-
* @param data source data
42-
* @return key
43-
*/
44-
String getKeyFromData(T data);
45-
46-
/**
47-
* Extracts value from data.
48-
*
49-
* @param data source data
50-
* @return value
51-
*/
52-
String getValueFromData(T data);
53-
54-
/**
55-
* Extracts the additional key from data as an {@link Optional<String>}.
56-
* The default implementation returns an empty Optional.
57-
*
58-
* @param data
59-
* @return Optional
60-
*/
61-
default Optional<String> getAdditionalKey(T data) {
62-
return Optional.empty();
63-
}
64-
65-
/**
66-
* Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}.
67-
* The default implementation returns an empty Optional.
68-
*
69-
* @param data
70-
* @return Optional
71-
*/
72-
default Optional<Integer> getAdditionalTTL(T data) {
73-
return Optional.empty();
74-
}
31+
RedisCommand getMessage(T input);
7532
}

src/main/java/org/apache/flink/connector/redis/sink2/async/RedisAsyncWriter.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,16 @@
1818
package org.apache.flink.connector.redis.sink2.async;
1919

2020
import org.apache.flink.api.connector.sink2.Sink;
21-
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
23-
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
24-
import org.apache.flink.connector.base.sink.writer.ElementConverter;
2522
import org.apache.flink.connector.redis.shared.JedisConnector;
26-
import org.apache.flink.connector.redis.sink2.RedisAction;
23+
import org.apache.flink.connector.redis.shared.command.RedisCommand;
2724
import org.apache.flink.connector.redis.sink2.RedisSinkConfig;
28-
import redis.clients.jedis.Protocol.Command;
29-
import redis.clients.jedis.commands.JedisCommands;
3025

31-
import java.io.IOException;
32-
import java.util.*;
33-
import java.util.concurrent.CompletableFuture;
26+
import java.util.ArrayList;
27+
import java.util.List;
3428
import java.util.function.Consumer;
35-
import java.util.function.Supplier;
3629

37-
import static redis.clients.jedis.Protocol.Command.SET;
38-
39-
public class RedisAsyncWriter<T> extends AsyncSinkWriter<T, RedisAction> {
30+
public class RedisAsyncWriter<T> extends AsyncSinkWriter<T, RedisCommand> {
4031

4132
private final JedisConnector jedisConnector;
4233

@@ -51,11 +42,11 @@ public RedisAsyncWriter(JedisConnector jedisConnector,
5142
}
5243

5344
@Override
54-
protected void submitRequestEntries(List<RedisAction> actions, Consumer<List<RedisAction>> toRetry) {
55-
List<RedisAction> errors = new ArrayList<>();
56-
for(RedisAction action : actions) {
45+
protected void submitRequestEntries(List<RedisCommand> actions, Consumer<List<RedisCommand>> toRetry) {
46+
List<RedisCommand> errors = new ArrayList<>();
47+
for(RedisCommand action : actions) {
5748
try {
58-
this.jedisConnector.execute(action);
49+
action.send(this.jedisConnector);
5950
} catch (Exception e) {
6051
errors.add(action);
6152
}
@@ -64,8 +55,8 @@ protected void submitRequestEntries(List<RedisAction> actions, Consumer<List<Red
6455
}
6556

6657
@Override
67-
protected long getSizeInBytes(RedisAction redisAction) {
68-
return redisAction.value.length();
58+
protected long getSizeInBytes(RedisCommand message) {
59+
return message.getMessageSize();
6960
}
7061

7162
public void close() {

src/main/java/org/apache/flink/connector/redis/sink2/async/RedisConverter.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
import org.apache.flink.api.connector.sink2.SinkWriter;
2020
import org.apache.flink.connector.base.sink.writer.ElementConverter;
21-
import org.apache.flink.connector.redis.sink2.RedisAction;
21+
import org.apache.flink.connector.redis.shared.command.RedisCommand;
2222
import org.apache.flink.connector.redis.sink2.RedisSerializer;
2323

24-
public class RedisConverter<T> implements ElementConverter<T, RedisAction> {
24+
public class RedisConverter<T> implements ElementConverter<T, RedisCommand> {
2525

2626
private final RedisSerializer<T> serializer;
2727

@@ -30,11 +30,7 @@ public RedisConverter(RedisSerializer<T> serializer) {
3030
}
3131

3232
@Override
33-
public RedisAction apply(T input, SinkWriter.Context context) {
34-
return RedisAction.builder()
35-
.withCommand(serializer.getCommand(input))
36-
.withKey(serializer.getKeyFromData(input))
37-
.withValue(serializer.getValueFromData(input))
38-
.build();
33+
public RedisCommand apply(T input, SinkWriter.Context context) {
34+
return serializer.getMessage(input);
3935
}
4036
}

src/main/java/org/apache/flink/connector/redis/sink2/sync/RedisWriter.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.api.connector.sink2.SinkWriter;
2121
import org.apache.flink.connector.redis.shared.JedisConnector;
22-
import org.apache.flink.connector.redis.sink2.RedisAction;
22+
import org.apache.flink.connector.redis.shared.command.RedisCommand;
2323
import org.apache.flink.connector.redis.sink2.RedisSerializer;
2424

2525
import java.io.IOException;
@@ -30,21 +30,17 @@ public class RedisWriter<T> implements SinkWriter<T> {
3030
private final JedisConnector jedisConnector;
3131
private final RedisSerializer<T> serializer;
3232

33-
private final Queue<RedisAction> queue = new ArrayDeque<>();
33+
private final Queue<RedisCommand> queue = new ArrayDeque<>();
3434
public RedisWriter(JedisConnector jedisConnector, RedisSerializer<T> serializer) {
3535
this.jedisConnector = jedisConnector;
3636
this.serializer = serializer;
3737
}
3838

3939
@Override
4040
public void write(T input, Context context) throws IOException, InterruptedException {
41-
RedisAction action = RedisAction.builder()
42-
.withCommand(serializer.getCommand(input))
43-
.withKey(serializer.getKeyFromData(input))
44-
.withValue(serializer.getValueFromData(input))
45-
.build();
41+
RedisCommand message = serializer.getMessage(input);
4642

47-
queue.add(action);
43+
queue.add(message);
4844
}
4945

5046
@Override
@@ -54,8 +50,8 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException {
5450

5551
private void flush() {
5652
while(!this.queue.isEmpty()) {
57-
RedisAction element = this.queue.remove();
58-
this.jedisConnector.execute(element);
53+
RedisCommand element = this.queue.remove();
54+
element.send(this.jedisConnector);
5955
}
6056
}
6157

0 commit comments

Comments
 (0)