Skip to content

Commit 4031e18

Browse files
authored
[FLINK-35280] Migrate HBase Sink to use Async Sink API
1 parent f796a9a commit 4031e18

File tree

30 files changed

+2066
-797
lines changed

30 files changed

+2066
-797
lines changed

docs/content/docs/connectors/table/hbase.md

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -170,37 +170,87 @@ Connector Options
170170
<td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td>
171171
</tr>
172172
<tr>
173-
<td><h5>sink.buffer-flush.max-size</h5></td>
173+
<td><h5>sink.flush-buffer.size</h5></td>
174174
<td>optional</td>
175175
<td>yes</td>
176-
<td style="word-wrap: break-word;">2mb</td>
177-
<td>MemorySize</td>
176+
<td style="word-wrap: break-word;">2097152</td>
177+
<td>Long</td>
178178
<td>Writing option, maximum size in memory of buffered rows for each writing request.
179179
This can improve performance for writing data to HBase database, but may increase the latency.
180-
Can be set to <code>'0'</code> to disable it.
181180
</td>
182181
</tr>
183182
<tr>
184-
<td><h5>sink.buffer-flush.max-rows</h5></td>
183+
<td><h5>sink.batch.max-size</h5></td>
185184
<td>optional</td>
186185
<td>yes</td>
187186
<td style="word-wrap: break-word;">1000</td>
188187
<td>Integer</td>
189188
<td>Writing option, maximum number of rows to buffer for each writing request.
190189
This can improve performance for writing data to HBase database, but may increase the latency.
191-
Can be set to <code>'0'</code> to disable it.
192190
</td>
193191
</tr>
194192
<tr>
195-
<td><h5>sink.buffer-flush.interval</h5></td>
193+
<td><h5>sink.flush-buffer.timeout</h5></td>
196194
<td>optional</td>
197195
<td>yes</td>
198-
<td style="word-wrap: break-word;">1s</td>
199-
<td>Duration</td>
200-
<td>Writing option, the interval to flush any buffered rows.
196+
<td style="word-wrap: break-word;">1000</td>
197+
<td>Long</td>
198+
<td>Writing option, the threshold time in milliseconds for an element to be in the buffer before flushing out.
201199
This can improve performance for writing data to HBase database, but may increase the latency.
202-
Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.buffer-flush.max-size'</code> and <code>'sink.buffer-flush.max-rows'</code>
203-
can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
200+
</td>
201+
</tr>
202+
<tr>
203+
<td><h5>sink.requests.max-inflight</h5></td>
204+
<td>optional</td>
205+
<td>yes</td>
206+
<td style="word-wrap: break-word;">1000</td>
207+
<td>Integer</td>
208+
<td>Request threshold for uncompleted requests before blocking new write requests.
209+
</td>
210+
</tr>
211+
<tr>
212+
<td><h5>sink.requests.max-buffered</h5></td>
213+
<td>optional</td>
214+
<td>yes</td>
215+
<td style="word-wrap: break-word;">1000</td>
216+
<td>Integer</td>
217+
<td>Maximum number of buffered records before applying backpressure.
218+
</td>
219+
</tr>
220+
<tr>
221+
<td><h5>sink.request-timeout</h5></td>
222+
<td>optional</td>
223+
<td>yes</td>
224+
<td style="word-wrap: break-word;">10 min</td>
225+
<td>Duration</td>
226+
<td>The maximum time to wait for a batch of HBase requests to complete before timing out.
227+
</td>
228+
</tr>
229+
<tr>
230+
<td><h5>sink.fail-on-timeout</h5></td>
231+
<td>optional</td>
232+
<td>yes</td>
233+
<td style="word-wrap: break-word;">false</td>
234+
<td>Boolean</td>
235+
<td>Whether to fail the job when a request times out. If false, timed-out requests will be logged but the job will continue processing. If true, a timeout will cause the job to fail.
236+
</td>
237+
</tr>
238+
<tr>
239+
<td><h5>sink.max-request-write-attempts</h5></td>
240+
<td>optional</td>
241+
<td>yes</td>
242+
<td style="word-wrap: break-word;">0</td>
243+
<td>Long</td>
244+
<td>Maximum number of attempts to save a record to HBase before the job fails. Set to 0 for unlimited retries.
245+
</td>
246+
</tr>
247+
<tr>
248+
<td><h5>sink.max-record-size</h5></td>
249+
<td>optional</td>
250+
<td>yes</td>
251+
<td style="word-wrap: break-word;">1048576</td>
252+
<td>Long</td>
253+
<td>The maximum size in bytes of a single record. Records bigger than this will cause the job to fail.
204254
</td>
205255
</tr>
206256
<tr>

flink-connector-hbase-2.6/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ under the License.
4040
<scope>provided</scope>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>org.apache.flink</groupId>
45+
<artifactId>flink-connector-base</artifactId>
46+
<scope>provided</scope>
47+
</dependency>
48+
4349
<dependency>
4450
<groupId>org.apache.flink</groupId>
4551
<artifactId>flink-streaming-java</artifactId>

flink-connector-hbase-2.6/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java

Lines changed: 95 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.configuration.ConfigOption;
2323
import org.apache.flink.configuration.ReadableConfig;
24-
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
24+
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
25+
import org.apache.flink.connector.base.table.AsyncSinkConnectorOptions;
26+
import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
2527
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
2628
import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
2729
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
@@ -30,14 +32,15 @@
3032
import org.apache.flink.table.connector.source.lookup.LookupOptions;
3133
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
3234
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
33-
import org.apache.flink.table.factories.DynamicTableSinkFactory;
3435
import org.apache.flink.table.factories.DynamicTableSourceFactory;
3536
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
3637

3738
import org.apache.hadoop.conf.Configuration;
3839

3940
import java.time.Duration;
4041
import java.util.HashSet;
42+
import java.util.Optional;
43+
import java.util.Properties;
4144
import java.util.Set;
4245
import java.util.stream.Collectors;
4346
import java.util.stream.Stream;
@@ -50,21 +53,24 @@
5053
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
5154
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
5255
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
56+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_FAIL_ON_TIMEOUT;
5357
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_IGNORE_NULL_VALUE;
58+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_SIZE;
59+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_MAX_RECORD_WRITE_ATTEMPTS;
5460
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_PARALLELISM;
61+
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_REQUEST_TIMEOUT;
5562
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.TABLE_NAME;
5663
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_QUORUM;
5764
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
5865
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
5966
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
60-
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
6167
import static org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
6268
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
6369

6470
/** HBase connector factory. */
6571
@Internal
66-
public class HBase2DynamicTableFactory
67-
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
72+
public class HBase2DynamicTableFactory extends AsyncDynamicTableSinkFactory
73+
implements DynamicTableSourceFactory {
6874

6975
private static final String IDENTIFIER = "hbase-2.6";
7076

@@ -116,21 +122,30 @@ public DynamicTableSink createDynamicTableSink(Context context) {
116122
TableFactoryHelper helper = createTableFactoryHelper(this, context);
117123
helper.validateExcept(PROPERTIES_PREFIX);
118124

119-
final ReadableConfig tableOptions = helper.getOptions();
125+
final ReadableConfig config = helper.getOptions();
120126

121127
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
122128

123-
String tableName = tableOptions.get(TABLE_NAME);
124-
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
125-
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
126-
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
127-
128-
return new HBaseDynamicTableSink(
129-
tableName,
130-
context.getPhysicalRowDataType(),
131-
hbaseConf,
132-
hBaseWriteOptions,
133-
nullStringLiteral);
129+
HBaseDynamicTableSink.HBaseDynamicSinkBuilder builder =
130+
HBaseDynamicTableSink.builder()
131+
.setRequestTimeoutMS(config.get(SINK_REQUEST_TIMEOUT).toMillis())
132+
.setMaxRecordSizeInBytes(config.get(SINK_MAX_RECORD_SIZE))
133+
.setFailOnTimeout(config.get(SINK_FAIL_ON_TIMEOUT))
134+
.setMaxRecordWriteAttempts(config.get(SINK_MAX_RECORD_WRITE_ATTEMPTS))
135+
.setTableName(config.get(TABLE_NAME))
136+
.setConfiguration(getHBaseConfiguration(config))
137+
.setNullStringLiteral(config.get(NULL_STRING_LITERAL))
138+
.setPhysicalDataType(context.getPhysicalRowDataType())
139+
.setParallelism(config.get(SINK_PARALLELISM))
140+
.setIgnoreNullValue(config.get(SINK_IGNORE_NULL_VALUE));
141+
142+
AsyncSinkConfigurationValidator asyncValidator =
143+
new AsyncSinkConfigurationValidator(config);
144+
145+
addAsyncOptionsToBuilder(getDeprecatedAsyncSinkOptions(config), builder);
146+
addAsyncOptionsToBuilder(asyncValidator.getValidatedConfigurations(), builder);
147+
148+
return builder.build();
134149
}
135150

136151
@Override
@@ -147,42 +162,79 @@ public Set<ConfigOption<?>> requiredOptions() {
147162

148163
@Override
149164
public Set<ConfigOption<?>> optionalOptions() {
150-
Set<ConfigOption<?>> set = new HashSet<>();
151-
set.add(ZOOKEEPER_ZNODE_PARENT);
152-
set.add(ZOOKEEPER_QUORUM);
153-
set.add(NULL_STRING_LITERAL);
154-
set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
155-
set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
156-
set.add(SINK_BUFFER_FLUSH_INTERVAL);
157-
set.add(SINK_PARALLELISM);
158-
set.add(SINK_IGNORE_NULL_VALUE);
159-
set.add(LOOKUP_ASYNC);
160-
set.add(LOOKUP_CACHE_MAX_ROWS);
161-
set.add(LOOKUP_CACHE_TTL);
162-
set.add(LOOKUP_MAX_RETRIES);
163-
set.add(LookupOptions.CACHE_TYPE);
164-
set.add(LookupOptions.MAX_RETRIES);
165-
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
166-
set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
167-
set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
168-
set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
169-
return set;
165+
Stream<ConfigOption<?>> hbaseOptions =
166+
Stream.of(
167+
ZOOKEEPER_ZNODE_PARENT,
168+
ZOOKEEPER_QUORUM,
169+
NULL_STRING_LITERAL,
170+
SINK_BUFFER_FLUSH_MAX_SIZE,
171+
SINK_BUFFER_FLUSH_MAX_ROWS,
172+
SINK_BUFFER_FLUSH_INTERVAL,
173+
SINK_PARALLELISM,
174+
SINK_IGNORE_NULL_VALUE,
175+
SINK_MAX_RECORD_SIZE,
176+
SINK_REQUEST_TIMEOUT,
177+
SINK_FAIL_ON_TIMEOUT,
178+
SINK_MAX_RECORD_WRITE_ATTEMPTS,
179+
LOOKUP_ASYNC,
180+
LOOKUP_CACHE_MAX_ROWS,
181+
LOOKUP_CACHE_TTL,
182+
LOOKUP_MAX_RETRIES,
183+
LookupOptions.CACHE_TYPE,
184+
LookupOptions.MAX_RETRIES,
185+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
186+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
187+
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
188+
LookupOptions.PARTIAL_CACHE_MAX_ROWS);
189+
Stream<ConfigOption<?>> asyncOptions = super.optionalOptions().stream();
190+
191+
return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet());
170192
}
171193

172194
@Override
173195
public Set<ConfigOption<?>> forwardOptions() {
174-
return Stream.of(
196+
Stream<ConfigOption<?>> hbaseOptions =
197+
Stream.of(
175198
TABLE_NAME,
176199
ZOOKEEPER_ZNODE_PARENT,
177200
ZOOKEEPER_QUORUM,
178201
NULL_STRING_LITERAL,
179-
LOOKUP_CACHE_MAX_ROWS,
180-
LOOKUP_CACHE_TTL,
181-
LOOKUP_MAX_RETRIES,
182202
SINK_BUFFER_FLUSH_MAX_SIZE,
183203
SINK_BUFFER_FLUSH_MAX_ROWS,
184204
SINK_BUFFER_FLUSH_INTERVAL,
185-
SINK_IGNORE_NULL_VALUE)
186-
.collect(Collectors.toSet());
205+
SINK_IGNORE_NULL_VALUE,
206+
SINK_MAX_RECORD_SIZE,
207+
SINK_REQUEST_TIMEOUT,
208+
SINK_FAIL_ON_TIMEOUT,
209+
SINK_MAX_RECORD_WRITE_ATTEMPTS,
210+
LOOKUP_CACHE_MAX_ROWS,
211+
LOOKUP_CACHE_TTL,
212+
LOOKUP_MAX_RETRIES);
213+
Stream<ConfigOption<?>> asyncOptions = super.optionalOptions().stream();
214+
215+
return Stream.concat(hbaseOptions, asyncOptions).collect(Collectors.toSet());
216+
}
217+
218+
private Properties getDeprecatedAsyncSinkOptions(ReadableConfig config) {
219+
Properties properties = new Properties();
220+
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_SIZE))
221+
.ifPresent(
222+
flushBufferSize ->
223+
properties.put(
224+
AsyncSinkConnectorOptions.FLUSH_BUFFER_SIZE.key(),
225+
flushBufferSize.getBytes()));
226+
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_MAX_ROWS))
227+
.ifPresent(
228+
maxBatchSize ->
229+
properties.put(
230+
AsyncSinkConnectorOptions.MAX_BATCH_SIZE.key(),
231+
maxBatchSize));
232+
Optional.ofNullable(config.get(SINK_BUFFER_FLUSH_INTERVAL))
233+
.ifPresent(
234+
timeout ->
235+
properties.put(
236+
AsyncSinkConnectorOptions.FLUSH_BUFFER_TIMEOUT.key(),
237+
timeout.toMillis()));
238+
return properties;
187239
}
188240
}

0 commit comments

Comments
 (0)