diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 0549fde46640..792d4bf4bb81 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -92,6 +92,12 @@ Integer If the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync. + +
lookup.refresh.full-load-threshold
+ 2147483647 + Integer + If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). +
lookup.refresh.time-periods-blacklist
(none) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 700ace3b48dc..f4efc859ed22 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -292,6 +292,15 @@ public class FlinkConnectorOptions { + "cache refreshing is forbidden. Blacklist format is start1->end1,start2->end2,... , " + "and the time format is yyyy-MM-dd HH:mm. Only used when lookup table is FULL cache mode."); + public static final ConfigOption LOOKUP_REFRESH_FULL_LOAD_THRESHOLD = + ConfigOptions.key("lookup.refresh.full-load-threshold") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription( + "If the pending snapshot count exceeds this threshold, lookup table will discard incremental updates " + + "and refresh the entire table from the latest snapshot. This can improve performance when there are many snapshots pending. " + + "Set to a reasonable value (e.g., 10) to enable this optimization. Default is Integer.MAX_VALUE (disabled). "); + public static final ConfigOption SINK_AUTO_TAG_FOR_SAVEPOINT = ConfigOptions.key("sink.savepoint.auto-tag") .booleanType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 7df47421010d..2e78adbf6ccb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -68,6 +68,7 @@ import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS; @@ -98,6 +99,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable { private transient Duration refreshInterval; // timestamp when refreshing lookup table private transient long nextRefreshTime; + // threshold for triggering full table reload when snapshots are pending + private transient int refreshFullThreshold; protected FunctionContext functionContext; @@ -175,6 +178,7 @@ private void open() throws Exception { this.refreshInterval = options.getOptional(LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL) .orElse(options.get(CONTINUOUS_DISCOVERY_INTERVAL)); + this.refreshFullThreshold = options.get(LOOKUP_REFRESH_FULL_LOAD_THRESHOLD); List fieldNames = table.rowType().getFieldNames(); int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); @@ -332,6 +336,7 @@ void tryRefresh() throws Exception { partitionLoader.partitions(), partitionLoader.createSpecificPartFilter()); lookupTable.close(); lookupTable.open(); + nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); // no need to refresh the lookup table because it is reopened return; } @@ -339,11 +344,48 @@ void tryRefresh() throws Exception { // 3. refresh lookup table if (shouldRefreshLookupTable()) { - lookupTable.refresh(); + // Check if we should do full load (close and reopen table) instead of incremental + // refresh + boolean doFullLoad = shouldDoFullLoad(); + + if (doFullLoad) { + LOG.info( + "Doing full load for table {} instead of incremental refresh", + table.name()); + lookupTable.close(); + lookupTable.open(); + } else { + lookupTable.refresh(); + } + nextRefreshTime = System.currentTimeMillis() + refreshInterval.toMillis(); } } + /** + * Check if we should do full load instead of incremental refresh. This can improve performance + * when there are many pending snapshots. + */ + @VisibleForTesting + public boolean shouldDoFullLoad() { + if (refreshFullThreshold <= 0 || refreshFullThreshold == Integer.MAX_VALUE) { + return false; + } + + Long latestSnapshotId = ((FileStoreTable) table).snapshotManager().latestSnapshotId(); + Long nextSnapshotId = lookupTable.nextSnapshotId(); + if (latestSnapshotId == null || nextSnapshotId == null) { + return false; + } + + LOG.info( + "Check if should do full load, latestSnapshotId: {}, nextSnapshotId: {}, refreshFullThreshold: {}", + latestSnapshotId, + nextSnapshotId, + refreshFullThreshold); + return latestSnapshotId - nextSnapshotId + 1 >= refreshFullThreshold; + } + private boolean shouldRefreshLookupTable() { if (nextRefreshTime > System.currentTimeMillis()) { return false; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index cda824cc946a..81af38ea8ff2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -151,6 +151,11 @@ public void specifyCacheRowFilter(Filter filter) { this.cacheRowFilter = filter; } + @Override + public Long nextSnapshotId() { + return this.reader.nextSnapshotId(); + } + protected void init() throws Exception { this.stateFactory = createStateFactory(); this.refreshExecutor = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index f8af411941ae..3ca792d39d5d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -41,4 +41,6 @@ public interface LookupTable extends Closeable { void refresh() throws Exception; void specifyCacheRowFilter(Filter filter); + + Long nextSnapshotId(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index ded0f29da79b..03016bbccf12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -197,6 +197,11 @@ public void specifyCacheRowFilter(Filter filter) { this.cacheRowFilter = filter; } + @Override + public Long nextSnapshotId() { + return this.queryExecutor.nextSnapshotId(); + } + @Override public void close() throws IOException { if (queryExecutor != null) { @@ -243,6 +248,10 @@ interface QueryExecutor extends Closeable { InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException; void refresh(); + + default Long nextSnapshotId() { + return Long.MAX_VALUE; + } } static class LocalQueryExecutor implements QueryExecutor { @@ -334,6 +343,11 @@ void refreshSplit(DataSplit split) { numBuckets.put(partition, totalBuckets); } + @Override + public Long nextSnapshotId() { + return this.scan.checkpoint(); + } + @Override public void close() throws IOException { tableQuery.close(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java index 4f7ff334f677..7142e6e57311 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java @@ -43,6 +43,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.TraceableFileIO; +import org.apache.flink.table.data.RowData; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -91,16 +92,19 @@ public void before() throws Exception { } private void createLookupFunction(boolean refreshAsync) throws Exception { - createLookupFunction(true, false, false, refreshAsync); + createLookupFunction(true, false, false, refreshAsync, null); } private void createLookupFunction( boolean isPartition, boolean joinEqualPk, boolean dynamicPartition, - boolean refreshAsync) + boolean refreshAsync, + Integer fullLoadThreshold) throws Exception { - table = createFileStoreTable(isPartition, dynamicPartition, refreshAsync); + table = + createFileStoreTable( + isPartition, dynamicPartition, refreshAsync, fullLoadThreshold); lookupFunction = createLookupFunction(table, joinEqualPk); lookupFunction.open(tempDir.toString()); } @@ -116,7 +120,11 @@ private FileStoreLookupFunction createLookupFunction( } private FileStoreTable createFileStoreTable( - boolean isPartition, boolean dynamicPartition, boolean refreshAsync) throws Exception { + boolean isPartition, + boolean dynamicPartition, + boolean refreshAsync, + Integer fullLoadThreshold) + throws Exception { SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); Options conf = new Options(); conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync); @@ -128,6 +136,10 @@ private FileStoreTable createFileStoreTable( conf.set(FlinkConnectorOptions.SCAN_PARTITIONS, "max_pt()"); } + if (fullLoadThreshold != null) { + conf.set(FlinkConnectorOptions.LOOKUP_REFRESH_FULL_LOAD_THRESHOLD, fullLoadThreshold); + } + RowType rowType = RowType.of( new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, @@ -153,7 +165,7 @@ public void close() throws Exception { @Test public void testCompatibilityForOldVersion() throws Exception { - createLookupFunction(false, true, false, false); + createLookupFunction(false, true, false, false, null); commit(writeCommit(1)); PrimaryKeyPartialLookupTable lookupTable = (PrimaryKeyPartialLookupTable) lookupFunction.lookupTable(); @@ -174,7 +186,7 @@ public void testCompatibilityForOldVersion() throws Exception { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testDefaultLocalPartial(boolean refreshAsync) throws Exception { - createLookupFunction(false, true, false, refreshAsync); + createLookupFunction(false, true, false, refreshAsync, null); assertThat(lookupFunction.lookupTable()).isInstanceOf(PrimaryKeyPartialLookupTable.class); QueryExecutor queryExecutor = ((PrimaryKeyPartialLookupTable) lookupFunction.lookupTable()).queryExecutor(); @@ -184,7 +196,7 @@ public void testDefaultLocalPartial(boolean refreshAsync) throws Exception { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testDefaultRemotePartial(boolean refreshAsync) throws Exception { - createLookupFunction(false, true, false, refreshAsync); + createLookupFunction(false, true, false, refreshAsync, null); ServiceManager serviceManager = new ServiceManager(fileIO, tablePath); serviceManager.resetService( PRIMARY_KEY_LOOKUP, new InetSocketAddress[] {new InetSocketAddress(1)}); @@ -232,7 +244,7 @@ public void testLookupExpiredSnapshot(boolean refreshAsync) throws Exception { @Test public void testLookupDynamicPartition() throws Exception { - createLookupFunction(true, false, true, false); + createLookupFunction(true, false, true, false, null); commit(writeCommit(1)); lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 10L))); assertThat( @@ -252,7 +264,7 @@ public void testLookupDynamicPartition() throws Exception { @Test public void testParseWrongTimePeriodsBlacklist() throws Exception { - FileStoreTable table = createFileStoreTable(false, false, false); + FileStoreTable table = createFileStoreTable(false, false, false, null); FileStoreTable table1 = table.copy( @@ -299,7 +311,7 @@ public void testCheckRefreshInBlacklist() throws Exception { String right = end.atZone(ZoneId.systemDefault()).format(formatter); FileStoreTable table = - createFileStoreTable(false, false, false) + createFileStoreTable(false, false, false, null) .copy( Collections.singletonMap( LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key(), @@ -312,6 +324,50 @@ public void testCheckRefreshInBlacklist() throws Exception { assertThat(lookupFunction.nextBlacklistCheckTime()).isEqualTo(end.toEpochMilli() + 1); } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testLookupTableWithFullLoad(boolean joinEqualPk) throws Exception { + createLookupFunction(false, joinEqualPk, false, false, 3); + + if (joinEqualPk) { + assertThat(lookupFunction.lookupTable()) + .isInstanceOf(PrimaryKeyPartialLookupTable.class); + } else { + assertThat(lookupFunction.lookupTable()).isInstanceOf(FullCacheLookupTable.class); + } + + GenericRow expectedRow = GenericRow.of(1, 1, 1L); + StreamTableWrite writer = table.newStreamWriteBuilder().newWrite(); + writer.write(expectedRow); + commit(writer.prepareCommit(true, 1)); + + List result = + new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 1L)))); + assertThat(result).size().isEqualTo(1); + RowData resultRow = result.get(0); + assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0)); + assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1)); + + // Create more commits to exceed threshold (3 more to have gap > 3) + for (int i = 2; i < 6; i++) { + writer.write(GenericRow.of(i, i, (long) i)); + commit(writer.prepareCommit(true, i)); + } + writer.close(); + + // wait refresh + Thread.sleep(2000); + + expectedRow = GenericRow.of(5, 5, 5L); + assertThat(lookupFunction.shouldDoFullLoad()).isTrue(); + lookupFunction.tryRefresh(); + result = new ArrayList<>(lookupFunction.lookup(new FlinkRowData(GenericRow.of(5, 5, 5L)))); + assertThat(result).size().isEqualTo(1); + resultRow = result.get(0); + assertThat(resultRow.getInt(0)).isEqualTo(expectedRow.getInt(0)); + assertThat(resultRow.getInt(1)).isEqualTo(expectedRow.getInt(1)); + } + private void commit(List messages) throws Exception { TableCommitImpl commit = table.newCommit(commitUser); commit.commit(messages);