Skip to content

Commit cfaf3cb

Browse files
authored
[FLINK-38491][pipeline-connector][iceberg] fixed iceberg compaction parallelism (#4150)
1 parent 6767d85 commit cfaf3cb

File tree

4 files changed

+5
-2
lines changed

4 files changed

+5
-2
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public Set<ConfigOption<?>> optionalOptions() {
122122
options.add(IcebergDataSinkOptions.PARTITION_KEY);
123123
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
124124
options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
125+
options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
125126
return options;
126127
}
127128
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void addPostCommitTopology(
136136
// Shuffle by different table id.
137137
DataStream<CommittableMessage<WriteResultWrapper>> keyedStream =
138138
committableMessageDataStream.partitionCustom(
139-
(bucket, numPartitions) -> bucket % numPartitions,
139+
Math::floorMod,
140140
(committableMessage) -> {
141141
if (committableMessage instanceof CommittableWithLineage) {
142142
WriteResultWrapper multiTableCommittable =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ void testCreateDataSink() {
4040

4141
Configuration conf = Configuration.fromMap(ImmutableMap.<String, String>builder().build());
4242
conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse");
43+
conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4);
4344
DataSink dataSink =
4445
sinkFactory.createDataSink(
4546
new FactoryHelper.DefaultContext(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ public void testCompationOperator() throws IOException, InterruptedException {
150150
}
151151
CompactionOperator compactionOperator =
152152
new CompactionOperator(
153-
catalogOptions, CompactionOptions.builder().commitInterval(1).build());
153+
catalogOptions,
154+
CompactionOptions.builder().commitInterval(1).parallelism(4).build());
154155
compactionOperator.processElement(
155156
new StreamRecord<>(
156157
new CommittableWithLineage<>(

0 commit comments

Comments
 (0)