Skip to content

Commit 53f8755

Browse files
committed
[FLINK-35280] PR comments
1 parent 51e72d2 commit 53f8755

File tree

11 files changed

+144
-73
lines changed

11 files changed

+144
-73
lines changed

flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/sink/HBaseDynamicTableSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
2626
import org.apache.flink.connector.hbase.sink.HBaseSinkBuilder;
2727
import org.apache.flink.connector.hbase.sink.RowDataToMutationElementConverter;
28-
import org.apache.flink.connector.hbase.sink.SerializableMutation;
2928
import org.apache.flink.connector.hbase.sink.WritableMetadata;
3029
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
30+
import org.apache.flink.connector.hbase.util.SerializableMutation;
3131
import org.apache.flink.table.connector.ChangelogMode;
3232
import org.apache.flink.table.connector.sink.DynamicTableSink;
3333
import org.apache.flink.table.connector.sink.SinkV2Provider;

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
88
import org.apache.flink.connector.base.sink.writer.ElementConverter;
99
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
10+
import org.apache.flink.connector.hbase.util.SerializableMutation;
1011
import org.apache.flink.core.io.SimpleVersionedSerializer;
1112

1213
import org.apache.hadoop.conf.Configuration;

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import org.apache.flink.annotation.PublicEvolving;
44
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
55
import org.apache.flink.connector.base.sink.writer.ElementConverter;
6+
import org.apache.flink.connector.hbase.util.SerializableMutation;
7+
import org.apache.flink.connector.hbase.util.WrappedElementConverter;
68

79
import org.apache.hadoop.conf.Configuration;
810
import org.apache.hadoop.hbase.client.Mutation;

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseStateSerializer.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22

33
import org.apache.flink.annotation.Internal;
44
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
5-
import org.apache.flink.connector.hbase.util.HBaseMutationSerialization;
5+
import org.apache.flink.connector.hbase.util.SerializableMutation;
6+
7+
import org.apache.hadoop.hbase.client.Delete;
8+
import org.apache.hadoop.hbase.client.Mutation;
9+
import org.apache.hadoop.hbase.client.Put;
10+
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
11+
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
612

713
import java.io.DataInputStream;
814
import java.io.DataOutputStream;
@@ -14,13 +20,30 @@ public class HBaseStateSerializer extends AsyncSinkWriterStateSerializer<Seriali
1420
@Override
1521
protected void serializeRequestToStream(SerializableMutation request, DataOutputStream out)
1622
throws IOException {
17-
HBaseMutationSerialization.serialize(request.get(), out);
23+
Mutation mutation = request.get();
24+
ClientProtos.MutationProto.MutationType type;
25+
26+
if (mutation instanceof Put) {
27+
type = ClientProtos.MutationProto.MutationType.PUT;
28+
} else if (mutation instanceof Delete) {
29+
type = ClientProtos.MutationProto.MutationType.DELETE;
30+
} else {
31+
throw new IllegalArgumentException(
32+
String.format(
33+
"Unknown HBase mutation type, cannot serialize: %s",
34+
mutation.getClass()));
35+
}
36+
37+
ClientProtos.MutationProto proto = ProtobufUtil.toMutation(type, mutation);
38+
proto.writeDelimitedTo(out);
1839
}
1940

2041
@Override
2142
protected SerializableMutation deserializeRequestFromStream(
2243
long requestSize, DataInputStream in) throws IOException {
23-
return new SerializableMutation(HBaseMutationSerialization.deserialize(in));
44+
ClientProtos.MutationProto proto = ClientProtos.MutationProto.parseDelimitedFrom(in);
45+
Mutation mutation = ProtobufUtil.toMutation(proto);
46+
return new SerializableMutation(mutation);
2447
}
2548

2649
@Override

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
1313
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
1414
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
15+
import org.apache.flink.connector.hbase.util.SerializableMutation;
1516
import org.apache.flink.metrics.Counter;
1617
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
1718
import org.apache.flink.util.StringUtils;

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import org.apache.flink.annotation.Internal;
44
import org.apache.flink.connector.base.sink.writer.ResultHandler;
5+
import org.apache.flink.connector.hbase.util.SerializableMutation;
56
import org.apache.flink.metrics.Counter;
6-
import org.apache.flink.util.Preconditions;
77

88
import org.apache.hadoop.hbase.DoNotRetryIOException;
99
import org.apache.hadoop.hbase.client.Mutation;
@@ -13,10 +13,14 @@
1313
import java.util.Collection;
1414
import java.util.HashSet;
1515
import java.util.List;
16+
import java.util.Queue;
1617
import java.util.Set;
1718
import java.util.concurrent.CompletableFuture;
1819
import java.util.concurrent.ConcurrentLinkedQueue;
1920
import java.util.stream.Collectors;
21+
import java.util.stream.IntStream;
22+
23+
import static org.apache.flink.util.Preconditions.checkArgument;
2024

2125
/**
2226
* This class is responsible for managing the async calls to HBase and managing the {@link
@@ -47,34 +51,47 @@ public void handleWriteFutures(
4751
List<CompletableFuture<Mutation>> futures,
4852
List<SerializableMutation> processedMutationsInOrder,
4953
ResultHandler<SerializableMutation> resultHandler) {
50-
Preconditions.checkArgument(
54+
checkArgument(
5155
futures.size() == processedMutationsInOrder.size(),
5256
"Different number of HBase futures was supplied than mutations.");
5357

54-
ConcurrentLinkedQueue<FailedMutation> failedMutations = new ConcurrentLinkedQueue<>();
58+
Queue<FailedMutation> failedMutations = new ConcurrentLinkedQueue<>();
5559

5660
// Handle each future separately and store failures.
57-
CompletableFuture<?>[] handledFutures = new CompletableFuture[futures.size()];
58-
for (int i = 0; i < futures.size(); i++) {
59-
final int index = i;
60-
handledFutures[index] =
61-
futures.get(index)
62-
.exceptionally(
63-
throwable -> {
64-
failedMutations.add(
65-
new FailedMutation(
66-
processedMutationsInOrder.get(index),
67-
throwable));
68-
return null;
69-
});
70-
}
61+
CompletableFuture<?>[] handledFutures =
62+
IntStream.range(0, futures.size())
63+
.mapToObj(
64+
i ->
65+
addFailedCallback(
66+
futures.get(i),
67+
failedMutations,
68+
processedMutationsInOrder.get(i)))
69+
.toArray(CompletableFuture[]::new);
7170

7271
// Exceptions are already handled here, so it's safe to use `thenRun()`.
7372
CompletableFuture.allOf(handledFutures)
74-
.thenRun(
75-
() -> {
76-
handleFailedRequests(failedMutations, resultHandler);
77-
});
73+
.thenRun(() -> handleFailedRequests(failedMutations, resultHandler));
74+
}
75+
76+
/**
77+
* For a {@link CompletableFuture} HBase write future, return a new future that adds a {@link
78+
* FailedMutation} to the failedMutations queue to mark that write as failed.
79+
*
80+
* @param cf future of a mutation write
81+
* @param failedMutations queue of failed mutations to add a new entry to upon failure
82+
* @param mutation the actual mutation that's being written by the future
83+
* @return new future with the exception callback
84+
*/
85+
private CompletableFuture<Mutation> addFailedCallback(
86+
CompletableFuture<Mutation> cf,
87+
Queue<FailedMutation> failedMutations,
88+
SerializableMutation mutation) {
89+
90+
return cf.exceptionally(
91+
throwable -> {
92+
failedMutations.add(new FailedMutation(mutation, throwable));
93+
return null;
94+
});
7895
}
7996

8097
/**

flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java

Lines changed: 0 additions & 45 deletions
This file was deleted.
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
package org.apache.flink.connector.hbase.sink;
1+
package org.apache.flink.connector.hbase.util;
22

33
import org.apache.flink.annotation.Internal;
4+
import org.apache.flink.connector.hbase.sink.HBaseSink;
5+
import org.apache.flink.connector.hbase.sink.HBaseWriter;
46

57
import org.apache.hadoop.hbase.client.Mutation;
68

@@ -14,7 +16,7 @@
1416
public class SerializableMutation implements Serializable {
1517
private static final long serialVersionUID = 1L;
1618

17-
private transient Mutation mutation;
19+
private final transient Mutation mutation;
1820

1921
public SerializableMutation(Mutation mutation) {
2022
this.mutation = mutation;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.apache.flink.connector.hbase.sink;
1+
package org.apache.flink.connector.hbase.util;
22

33
import org.apache.flink.annotation.Internal;
44
import org.apache.flink.api.connector.sink2.Sink;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.apache.flink.connector.hbase.sink;
2+
3+
import org.apache.flink.connector.hbase.util.SerializableMutation;
4+
5+
import org.apache.hadoop.hbase.client.Append;
6+
import org.apache.hadoop.hbase.client.Delete;
7+
import org.apache.hadoop.hbase.client.Mutation;
8+
import org.apache.hadoop.hbase.client.Put;
9+
import org.junit.jupiter.api.Test;
10+
11+
import java.io.ByteArrayInputStream;
12+
import java.io.ByteArrayOutputStream;
13+
import java.io.DataInputStream;
14+
import java.io.DataOutputStream;
15+
import java.io.IOException;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.junit.jupiter.api.Assertions.assertThrows;
19+
20+
class HBaseStateSerializerTest {
21+
22+
private final HBaseStateSerializer serializer = new HBaseStateSerializer();
23+
24+
@Test
25+
public void testSerdePut() throws IOException {
26+
SerializableMutation mut = new SerializableMutation(new Put(new byte[] {0x00, 0x01}, 1001));
27+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
28+
DataOutputStream out = new DataOutputStream(baos);
29+
30+
serializer.serializeRequestToStream(mut, out);
31+
32+
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
33+
DataInputStream in = new DataInputStream(bais);
34+
35+
Mutation mutationResult = serializer.deserializeRequestFromStream(0, in).get();
36+
37+
assertThat(mutationResult.getRow()).isEqualTo(new byte[] {0x00, 0x01});
38+
assertThat(mutationResult.getTimestamp()).isEqualTo(1001);
39+
}
40+
41+
@Test
42+
public void testSerdeDelete() throws IOException {
43+
SerializableMutation mut =
44+
new SerializableMutation(new Delete(new byte[] {0x00, 0x02}, 1002));
45+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
46+
DataOutputStream out = new DataOutputStream(baos);
47+
48+
serializer.serializeRequestToStream(mut, out);
49+
50+
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
51+
DataInputStream in = new DataInputStream(bais);
52+
53+
Mutation mutationResult = serializer.deserializeRequestFromStream(0, in).get();
54+
55+
assertThat(mutationResult.getRow()).isEqualTo(new byte[] {0x00, 0x02});
56+
assertThat(mutationResult.getTimestamp()).isEqualTo(1002);
57+
}
58+
59+
@Test
60+
public void testSerializationUnsupportedMutationType() {
61+
SerializableMutation mut = new SerializableMutation(new Append(new byte[] {0x00, 0x02}));
62+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
63+
DataOutputStream out = new DataOutputStream(baos);
64+
65+
assertThrows(
66+
IllegalArgumentException.class,
67+
() -> serializer.serializeRequestToStream(mut, out));
68+
}
69+
}

0 commit comments

Comments
 (0)