Skip to content

Commit 7c53e0f

Browse files
committed
[FLINK-15571] Add RedisSink
1 parent 7e554a3 commit 7c53e0f

File tree

11 files changed

+600
-2
lines changed

11 files changed

+600
-2
lines changed

pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ under the License.
5353
<flink.version>1.15.1</flink.version>
5454
<jedis.version>4.2.3</jedis.version>
5555
<testcontainers.version>1.17.3</testcontainers.version>
56+
<junit.version>5.8.2</junit.version>
5657

5758
</properties>
5859

@@ -76,13 +77,13 @@ under the License.
7677
<dependency>
7778
<groupId>org.junit.jupiter</groupId>
7879
<artifactId>junit-jupiter-api</artifactId>
79-
<version>5.8.2</version>
80+
<version>${junit.version}</version>
8081
<scope>test</scope>
8182
</dependency>
8283
<dependency>
8384
<groupId>org.junit.jupiter</groupId>
8485
<artifactId>junit-jupiter</artifactId>
85-
<version>5.8.2</version>
86+
<version>${junit.version}</version>
8687
<scope>test</scope>
8788
</dependency>
8889
<dependency>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.flink.connector.redis.sink2;
18+
19+
import java.io.Serializable;
20+
21+
public class RedisAction implements Serializable {
22+
public final RedisCommand command;
23+
public final String key;
24+
public final String value;
25+
26+
private RedisAction(RedisCommand command, String key, String value) {
27+
this.command = command;
28+
this.key = key;
29+
this.value = value;
30+
}
31+
32+
public static Builder builder() {
33+
return new Builder();
34+
}
35+
36+
public static class Builder {
37+
private RedisCommand command;
38+
private String key;
39+
private String value;
40+
41+
public Builder withCommand(RedisCommand command) {
42+
this.command = command;
43+
return this;
44+
}
45+
46+
public Builder withKey(String key) {
47+
this.key = key;
48+
return this;
49+
}
50+
51+
public Builder withValue(String value) {
52+
this.value = value;
53+
return this;
54+
}
55+
56+
public RedisAction build() {
57+
return new RedisAction(command, key, value);
58+
}
59+
}
60+
61+
62+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.flink.connector.redis.sink2;
18+
19+
public enum RedisCommand {
20+
21+
SET
22+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.flink.connector.redis.sink2;
18+
19+
import org.apache.flink.api.common.functions.Function;
20+
21+
import java.io.Serializable;
22+
import java.util.Optional;
23+
24+
/**
25+
* Function that creates the description how the input data should be mapped to redis type.
26+
*
27+
* @param <T> The type of the element handled by this {@code RedisSerializer}
28+
*/
29+
public interface RedisSerializer<T> extends Function, Serializable {
30+
31+
/**
32+
* Returns a redis command.
33+
*
34+
* @return RedisCommand
35+
*/
36+
RedisCommand getCommand(T data);
37+
38+
/**
39+
* Extracts key from data.
40+
*
41+
* @param data source data
42+
* @return key
43+
*/
44+
String getKeyFromData(T data);
45+
46+
/**
47+
* Extracts value from data.
48+
*
49+
* @param data source data
50+
* @return value
51+
*/
52+
String getValueFromData(T data);
53+
54+
/**
55+
* Extracts the additional key from data as an {@link Optional<String>}.
56+
* The default implementation returns an empty Optional.
57+
*
58+
* @param data
59+
* @return Optional
60+
*/
61+
default Optional<String> getAdditionalKey(T data) {
62+
return Optional.empty();
63+
}
64+
65+
/**
66+
* Extracts the additional time to live (TTL) for data as an {@link Optional<Integer>}.
67+
* The default implementation returns an empty Optional.
68+
*
69+
* @param data
70+
* @return Optional
71+
*/
72+
default Optional<Integer> getAdditionalTTL(T data) {
73+
return Optional.empty();
74+
}
75+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.redis.sink2;
19+
20+
import org.apache.flink.api.connector.sink2.Sink;
21+
import org.apache.flink.api.connector.sink2.SinkWriter;
22+
import org.apache.flink.connector.redis.shared.JedisConnectorBuilder;
23+
import org.apache.flink.connector.redis.shared.config.JedisConfig;
24+
import org.apache.flink.connector.redis.sink2.async.RedisAsyncWriter;
25+
import org.apache.flink.connector.redis.sink2.async.RedisConverter;
26+
27+
import java.io.IOException;
28+
29+
public class RedisSink<T> implements Sink<T> {
30+
31+
private final JedisConfig jedisConfig;
32+
private final RedisSerializer<T> serializer;
33+
34+
private final RedisSinkConfig sinkConfig;
35+
36+
public RedisSink(JedisConfig jedisConfig, RedisSinkConfig sinkConfig, RedisSerializer<T> serializer) {
37+
this.jedisConfig = jedisConfig;
38+
this.sinkConfig = sinkConfig;
39+
this.serializer = serializer;
40+
}
41+
42+
@Override
43+
public SinkWriter<T> createWriter(InitContext initContext) throws IOException {
44+
return new RedisAsyncWriter<>(
45+
JedisConnectorBuilder.build(jedisConfig),
46+
new RedisConverter<>(serializer),
47+
sinkConfig,
48+
initContext
49+
);
50+
}
51+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.flink.connector.redis.sink2;
18+
19+
import java.io.Serializable;
20+
21+
public class RedisSinkConfig implements Serializable {
22+
23+
public final int maxBatchSize;
24+
public final int maxInFlightRequests;
25+
public final int maxBufferedRequests;
26+
public final long maxBatchSizeInBytes;
27+
public final long maxTimeInBufferMS;
28+
public final long maxRecordSizeInBytes;
29+
30+
public RedisSinkConfig(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
31+
long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
32+
this.maxBatchSize = maxBatchSize;
33+
this.maxInFlightRequests = maxInFlightRequests;
34+
this.maxBufferedRequests = maxBufferedRequests;
35+
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
36+
this.maxTimeInBufferMS = maxTimeInBufferMS;
37+
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
38+
}
39+
40+
public static Builder builder() {
41+
return new Builder();
42+
}
43+
44+
public static class Builder {
45+
private int maxBatchSize = 10;
46+
private int maxInFlightRequests = 1;
47+
private int maxBufferedRequests = 100;
48+
private long maxBatchSizeInBytes = 110;
49+
private long maxTimeInBufferMS = 1_000;
50+
private long maxRecordSizeInBytes = maxBatchSizeInBytes;
51+
52+
public Builder withMaxBatchSize(int maxBatchSize) {
53+
this.maxBatchSize = maxBatchSize;
54+
return this;
55+
}
56+
57+
public Builder withMaxInFlightRequests(int maxInFlightRequests) {
58+
this.maxInFlightRequests = maxInFlightRequests;
59+
return this;
60+
}
61+
62+
public Builder withMaxBufferedRequests(int maxBufferedRequests) {
63+
this.maxBufferedRequests = maxBufferedRequests;
64+
return this;
65+
}
66+
67+
public Builder withMaxBatchSizeInBytes(long maxBatchSizeInBytes) {
68+
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
69+
return this;
70+
}
71+
72+
public Builder withMaxTimeInBufferMS(long maxTimeInBufferMS) {
73+
this.maxTimeInBufferMS = maxTimeInBufferMS;
74+
return this;
75+
}
76+
77+
public Builder withMaxRecordSizeInBytes(long maxRecordSizeInBytes) {
78+
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
79+
return this;
80+
}
81+
82+
public RedisSinkConfig build() {
83+
return new RedisSinkConfig(
84+
maxBatchSize, maxInFlightRequests, maxBufferedRequests,
85+
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes);
86+
}
87+
}
88+
89+
}

0 commit comments

Comments
 (0)