Skip to content

Commit 28c4803

Browse files
committed
[FLINK-35280] Add maxRequestWriteAttempts config
1 parent 53f8755 commit 28c4803

File tree

11 files changed

+130
-17
lines changed

11 files changed

+130
-17
lines changed

flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT;
5757
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
5858
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE;
59+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_WRITE_ATTEMPTS;
5960
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
6061
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT;
6162
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
@@ -130,6 +131,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
130131
.setRequestTimeoutMS(config.get(SINK_REQUEST_TIMEOUT).toMillis())
131132
.setMaxRecordSizeInBytes(config.get(SINK_MAX_RECORD_SIZE))
132133
.setFailOnTimeout(config.get(SINK_FAIL_ON_TIMEOUT))
134+
.setMaxRecordWriteAttempts(config.get(SINK_MAX_RECORD_WRITE_ATTEMPTS))
133135
.setTableName(config.get(TABLE_NAME))
134136
.setConfiguration(getHBaseConfiguration(config))
135137
.setNullStringLiteral(config.get(NULL_STRING_LITERAL))
@@ -173,6 +175,7 @@ public Set<ConfigOption<?>> optionalOptions() {
173175
SINK_MAX_RECORD_SIZE,
174176
SINK_REQUEST_TIMEOUT,
175177
SINK_FAIL_ON_TIMEOUT,
178+
SINK_MAX_RECORD_WRITE_ATTEMPTS,
176179
LOOKUP_ASYNC,
177180
LOOKUP_CACHE_MAX_ROWS,
178181
LOOKUP_CACHE_TTL,
@@ -203,6 +206,7 @@ public Set<ConfigOption<?>> forwardOptions() {
203206
SINK_MAX_RECORD_SIZE,
204207
SINK_REQUEST_TIMEOUT,
205208
SINK_FAIL_ON_TIMEOUT,
209+
SINK_MAX_RECORD_WRITE_ATTEMPTS,
206210
LOOKUP_CACHE_MAX_ROWS,
207211
LOOKUP_CACHE_TTL,
208212
LOOKUP_MAX_RETRIES);

flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class HBaseDynamicTableSink extends AsyncDynamicTableSink<SerializableMut
5353
private final Long maxRecordSizeInBytes;
5454
private final Long requestTimeoutMS;
5555
private final Boolean failOnTimeout;
56+
private final Long maxRecordWriteAttempts;
5657
private final String tableName;
5758
private final HBaseTableSchema hbaseTableSchema;
5859
private final Configuration configuration;
@@ -77,6 +78,7 @@ public HBaseDynamicTableSink(
7778
Long maxRecordSizeInBytes,
7879
Long requestTimeoutMS,
7980
Boolean failOnTimeout,
81+
Long maxRecordWriteAttempts,
8082
Integer parallelism,
8183
Boolean ignoreNullValue,
8284
String tableName,
@@ -92,6 +94,7 @@ public HBaseDynamicTableSink(
9294
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
9395
this.requestTimeoutMS = requestTimeoutMS;
9496
this.failOnTimeout = failOnTimeout;
97+
this.maxRecordWriteAttempts = maxRecordWriteAttempts;
9598
this.parallelism = parallelism;
9699
this.ignoreNullValue = ignoreNullValue;
97100
this.tableName = tableName;
@@ -118,6 +121,7 @@ public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(Context conte
118121
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
119122
.setRequestTimeoutMS(requestTimeoutMS)
120123
.setFailOnTimeout(failOnTimeout)
124+
.setMaxRecordWriteAttempts(maxRecordWriteAttempts)
121125
.setElementConverter(elementConverter);
122126
addAsyncOptionsToSinkBuilder(builder);
123127
return SinkV2Provider.of(builder.build(), parallelism);
@@ -156,6 +160,7 @@ public DynamicTableSink copy() {
156160
requestTimeoutMS,
157161
maxRecordSizeInBytes,
158162
failOnTimeout,
163+
maxRecordWriteAttempts,
159164
parallelism,
160165
ignoreNullValue,
161166
tableName,
@@ -194,6 +199,7 @@ public static class HBaseDynamicSinkBuilder
194199
private Long maxRecordSizeInBytes = null;
195200
private Long requestTimeoutMS = null;
196201
private Boolean failOnTimeout = null;
202+
private Long maxRecordWriteAttempts = null;
197203
private String tableName = null;
198204
private Configuration configuration = null;
199205
private String nullStringLiteral = null;
@@ -212,6 +218,7 @@ public AsyncDynamicTableSink<SerializableMutation> build() {
212218
maxRecordSizeInBytes,
213219
requestTimeoutMS,
214220
failOnTimeout,
221+
maxRecordWriteAttempts,
215222
parallelism,
216223
ignoreNullValue,
217224
tableName,
@@ -235,6 +242,11 @@ public HBaseDynamicSinkBuilder setFailOnTimeout(Boolean failOnTimeout) {
235242
return this;
236243
}
237244

245+
public HBaseDynamicSinkBuilder setMaxRecordWriteAttempts(Long maxRecordWriteAttempts) {
246+
this.maxRecordWriteAttempts = maxRecordWriteAttempts;
247+
return this;
248+
}
249+
238250
public HBaseDynamicSinkBuilder setTableName(String tableName) {
239251
this.tableName = tableName;
240252
return this;

flink-connector-hbase-2.6/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT;
6060
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
6161
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE;
62+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_WRITE_ATTEMPTS;
6263
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
6364
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT;
6465
import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -243,6 +244,7 @@ void testAsyncOptions() {
243244
options.put(FLUSH_BUFFER_TIMEOUT.key(), "10000");
244245
options.put(SINK_REQUEST_TIMEOUT.key(), "5 s");
245246
options.put(SINK_FAIL_ON_TIMEOUT.key(), "true");
247+
options.put(SINK_MAX_RECORD_WRITE_ATTEMPTS.key(), "12345");
246248
options.put(SINK_IGNORE_NULL_VALUE.key(), "true");
247249
options.put(SINK_MAX_RECORD_SIZE.key(), "6123");
248250

@@ -261,6 +263,7 @@ void testAsyncOptions() {
261263
.setRequestTimeoutMS(5L * 1000L)
262264
.setMaxRecordSizeInBytes(6123L)
263265
.setFailOnTimeout(true)
266+
.setMaxRecordWriteAttempts(12345L)
264267
.setIgnoreNullValue(true)
265268
.setPhysicalDataType(schema.toPhysicalRowDataType())
266269
.build();
@@ -275,6 +278,7 @@ void testAsyncOptions() {
275278
"requestTimeoutMS",
276279
"maxRecordSizeInBytes",
277280
"failOnTimeout",
281+
"maxRecordWriteAttempts",
278282
"ignoreNullValue")
279283
.isEqualTo(expectedSink);
280284
}

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public class HBaseSink<T> extends AsyncSinkBase<T, SerializableMutation> {
6969
*/
7070
private final byte[] serializedHadoopConfiguration;
7171

72+
/** Maximum times a record will be attempted to be written to HBase. */
73+
private final long maxRecordWriteAttempts;
74+
7275
public HBaseSink(
7376
ElementConverter<T, SerializableMutation> elementConverter,
7477
int maxBatchSize,
@@ -79,6 +82,7 @@ public HBaseSink(
7982
long maxRecordSizeInBytes,
8083
long requestTimeoutMS,
8184
boolean failOnTimeout,
85+
long maxRecordWriteAttempts,
8286
String tableName,
8387
Configuration configuration) {
8488
super(
@@ -100,6 +104,7 @@ public HBaseSink(
100104
this.tableName = tableName;
101105
this.serializedHadoopConfiguration =
102106
HBaseConfigurationUtil.serializeConfiguration(configuration);
107+
this.maxRecordWriteAttempts = maxRecordWriteAttempts;
103108
}
104109

105110
/** This can be removed once rebased to Flink 2.0. */
@@ -116,6 +121,7 @@ public SinkWriter<T> createWriter(InitContext context) {
116121
getMaxRecordSizeInBytes(),
117122
getRequestTimeoutMS(),
118123
getFailOnTimeout(),
124+
maxRecordWriteAttempts,
119125
tableName,
120126
HBaseConfigurationUtil.deserializeConfiguration(
121127
serializedHadoopConfiguration, null));
@@ -135,6 +141,7 @@ public SinkWriter<T> createWriter(WriterInitContext writerInitContext) {
135141
getMaxRecordSizeInBytes(),
136142
getRequestTimeoutMS(),
137143
getFailOnTimeout(),
144+
maxRecordWriteAttempts,
138145
tableName,
139146
HBaseConfigurationUtil.deserializeConfiguration(
140147
serializedHadoopConfiguration, null));
@@ -156,6 +163,7 @@ public StatefulSinkWriter<T, BufferedRequestState<SerializableMutation>> restore
156163
getMaxRecordSizeInBytes(),
157164
getRequestTimeoutMS(),
158165
getFailOnTimeout(),
166+
maxRecordWriteAttempts,
159167
tableName,
160168
HBaseConfigurationUtil.deserializeConfiguration(
161169
serializedHadoopConfiguration, null));

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,14 @@ public class HBaseSinkBuilder<InputT>
6464
private static final Long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 1024L * 1024L;
6565
private static final Long DEFAULT_MAX_REQUEST_TIMEOUT_MS = 60L * 1000L;
6666
private static final Boolean DEFAULT_FAIL_ON_TIMEOUT = false;
67+
private static final Long DEFAULT_MAX_RECORD_WRITE_ATTEMPTS = 0L;
6768

6869
private String tableName;
6970
private Configuration configuration;
7071
private ElementConverter<InputT, SerializableMutation> elementConverter;
7172
private Long requestTimeoutMS = null;
7273
private Boolean failOnTimeout = null;
74+
private Long maxRecordWriteAttempts = null;
7375

7476
public HBaseSinkBuilder() {}
7577

@@ -93,6 +95,11 @@ public HBaseSinkBuilder<InputT> setFailOnTimeout(Boolean failOnTimeout) {
9395
return this;
9496
}
9597

98+
public HBaseSinkBuilder<InputT> setMaxRecordWriteAttempts(Long maxRecordWriteAttempts) {
99+
this.maxRecordWriteAttempts = maxRecordWriteAttempts;
100+
return this;
101+
}
102+
96103
/**
97104
* Set up the converter to use when converting the input elements to a {@link Mutation} object.
98105
* Since {@link Mutation} objects don't implement {@link java.io.Serializable}, this will
@@ -120,6 +127,8 @@ public HBaseSink<InputT> build() {
120127
.orElse(DEFAULT_MAX_RECORD_SIZE_IN_BYTES),
121128
Optional.ofNullable(requestTimeoutMS).orElse(DEFAULT_MAX_REQUEST_TIMEOUT_MS),
122129
Optional.ofNullable(failOnTimeout).orElse(DEFAULT_FAIL_ON_TIMEOUT),
130+
Optional.ofNullable(maxRecordWriteAttempts)
131+
.orElse(DEFAULT_MAX_RECORD_WRITE_ATTEMPTS),
123132
tableName,
124133
configuration);
125134
}

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@ protected void serializeRequestToStream(SerializableMutation request, DataOutput
3434
mutation.getClass()));
3535
}
3636

37+
out.writeLong(request.getRecordWriteAttempts());
3738
ClientProtos.MutationProto proto = ProtobufUtil.toMutation(type, mutation);
3839
proto.writeDelimitedTo(out);
3940
}
4041

4142
@Override
4243
protected SerializableMutation deserializeRequestFromStream(
4344
long requestSize, DataInputStream in) throws IOException {
45+
long retryNum = in.readLong();
4446
ClientProtos.MutationProto proto = ClientProtos.MutationProto.parseDelimitedFrom(in);
4547
Mutation mutation = ProtobufUtil.toMutation(proto);
46-
return new SerializableMutation(mutation);
48+
return new SerializableMutation(mutation, retryNum);
4749
}
4850

4951
@Override

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public HBaseWriter(
6969
long maxRecordSizeInBytes,
7070
long requestTimeoutMS,
7171
boolean failOnTimeout,
72+
long maxRecordWriteAttempts,
7273
String tableName,
7374
Configuration configuration) {
7475
super(
@@ -92,7 +93,8 @@ public HBaseWriter(
9293
this.table = connection.getTable(TableName.valueOf(tableName));
9394
this.metrics = context.metricGroup();
9495
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
95-
this.hBaseWriterAsyncHandler = new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter);
96+
this.hBaseWriterAsyncHandler =
97+
new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter, maxRecordWriteAttempts);
9698
}
9799

98100
public HBaseWriter(
@@ -107,6 +109,7 @@ public HBaseWriter(
107109
long maxRecordSizeInBytes,
108110
long requestTimeoutMS,
109111
boolean failOnTimeout,
112+
long maxRecordWriteAttempts,
110113
String tableName,
111114
Configuration configuration) {
112115
super(
@@ -130,7 +133,8 @@ public HBaseWriter(
130133
this.table = connection.getTable(TableName.valueOf(tableName));
131134
this.metrics = context.metricGroup();
132135
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
133-
this.hBaseWriterAsyncHandler = new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter);
136+
this.hBaseWriterAsyncHandler =
137+
new HBaseWriterAsyncHandler(numRecordsOutErrorsCounter, maxRecordWriteAttempts);
134138
}
135139

136140
/**

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212

13+
import java.util.ArrayList;
1314
import java.util.Collection;
1415
import java.util.HashSet;
1516
import java.util.List;
1617
import java.util.Queue;
1718
import java.util.Set;
1819
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.stream.Collectors;
2121
import java.util.stream.IntStream;
2222

2323
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -31,9 +31,12 @@ public class HBaseWriterAsyncHandler {
3131
private static final Logger LOG = LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
3232

3333
private final Counter numRecordsOutErrorsCounter;
34+
private final long maxRecordWriteAttempts;
3435

35-
public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
36+
public HBaseWriterAsyncHandler(
37+
Counter numRecordsOutErrorsCounter, long maxRecordWriteAttempts) {
3638
this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
39+
this.maxRecordWriteAttempts = maxRecordWriteAttempts;
3740
}
3841

3942
/**
@@ -114,6 +117,7 @@ private void handleFailedRequests(
114117

115118
numRecordsOutErrorsCounter.inc(failedMutations.size());
116119

120+
List<SerializableMutation> retryEntries = new ArrayList<>();
117121
for (FailedMutation failedMutation : failedMutations) {
118122
LOG.warn("Mutation failed with exception", failedMutation.getThrowable());
119123

@@ -123,12 +127,21 @@ private void handleFailedRequests(
123127
failedMutation.getThrowable()));
124128
return;
125129
}
130+
131+
SerializableMutation serializableMutation = failedMutation.getMutation();
132+
serializableMutation.incWriteAttempts();
133+
if (maxRecordWriteAttempts > 0
134+
&& serializableMutation.getRecordWriteAttempts() > maxRecordWriteAttempts) {
135+
resultHandler.completeExceptionally(
136+
new HBaseSinkException.HBaseSinkMutationException(
137+
failedMutation.getThrowable()));
138+
return;
139+
}
140+
141+
retryEntries.add(serializableMutation);
126142
}
127143

128-
resultHandler.retryForEntries(
129-
failedMutations.stream()
130-
.map(FailedMutation::getWrappedMutation)
131-
.collect(Collectors.toList()));
144+
resultHandler.retryForEntries(retryEntries);
132145
}
133146

134147
/**
@@ -162,7 +175,7 @@ private FailedMutation(SerializableMutation mutation, Throwable throwable) {
162175
this.throwable = throwable;
163176
}
164177

165-
public SerializableMutation getWrappedMutation() {
178+
public SerializableMutation getMutation() {
166179
return mutation;
167180
}
168181

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,13 @@ public class HBaseConnectorOptions {
122122
.withDescription(
123123
"Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail.");
124124

125+
public static final ConfigOption<Long> SINK_MAX_RECORD_WRITE_ATTEMPTS =
126+
ConfigOptions.key("sink.max-request-write-attempts")
127+
.longType()
128+
.defaultValue(0L)
129+
.withDescription(
130+
"Maximum number of attempts to save a record to HBase before the job fails. Set to 0 for unlimited retries.");
131+
125132
public static final ConfigOption<Boolean> SINK_IGNORE_NULL_VALUE =
126133
ConfigOptions.key("sink.ignore-null-value")
127134
.booleanType()

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/SerializableMutation.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,27 @@ public class SerializableMutation implements Serializable {
1717
private static final long serialVersionUID = 1L;
1818

1919
private final transient Mutation mutation;
20+
private long recordWriteAttempts;
2021

2122
public SerializableMutation(Mutation mutation) {
23+
this(mutation, 0L);
24+
}
25+
26+
public SerializableMutation(Mutation mutation, long recordWriteAttempts) {
2227
this.mutation = mutation;
28+
this.recordWriteAttempts = recordWriteAttempts;
2329
}
2430

2531
/** Get the wrapped mutation object. */
2632
public Mutation get() {
2733
return mutation;
2834
}
35+
36+
public long getRecordWriteAttempts() {
37+
return recordWriteAttempts;
38+
}
39+
40+
public void incWriteAttempts() {
41+
recordWriteAttempts++;
42+
}
2943
}

0 commit comments

Comments
 (0)