From ed9a49e2625a9a7f0a8e3b01ed79d7477babe1e8 Mon Sep 17 00:00:00 2001 From: yantian Date: Sun, 28 Sep 2025 14:48:24 +0800 Subject: [PATCH] support conf for flink --- .../apache/paimon/flink/FlinkTableFactory.java | 15 ++++++++++++++- .../apache/paimon/flink/FormatCatalogTable.java | 12 ++++++++++++ ...ITCaseBase.java => PaimonFormatTableTest.java} | 11 +++++++++-- 3 files changed, 35 insertions(+), 3 deletions(-) rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/{FormatTableReadITCaseBase.java => PaimonFormatTableTest.java} (89%) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index bd24ef7dac0d..e1528d7bd0bb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -52,6 +52,13 @@ public String factoryIdentifier() { @Override public DynamicTableSource createDynamicTableSource(Context context) { + CatalogTable table = context.getCatalogTable().getOrigin(); + if (table instanceof FormatCatalogTable) { + CoreOptions options = new CoreOptions(((FormatCatalogTable) table).table().options()); + if (!options.formatTableImplementationIsPaimon()) { + return ((FormatCatalogTable) table).createTableSource(context); + } + } createTableIfNeeded(context); return super.createDynamicTableSource(context); } @@ -60,7 +67,13 @@ public DynamicTableSource createDynamicTableSource(Context context) { public DynamicTableSink createDynamicTableSink(Context context) { CatalogTable table = context.getCatalogTable().getOrigin(); if (table instanceof FormatCatalogTable) { - return ((FormatCatalogTable) table).createTableSink(context); + CoreOptions options = new CoreOptions(((FormatCatalogTable) table).table().options()); + if (!options.formatTableImplementationIsPaimon()) { + return ((FormatCatalogTable) table).createTableSink(context); + } else { + throw new UnsupportedOperationException( + "Format table's sink is not supported yet."); + } } createTableIfNeeded(context); return super.createDynamicTableSink(context); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java index 1c7520621cd5..27db9f166347 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.logical.RowType; @@ -127,6 +128,17 @@ public Optional getDetailedDescription() { return getDescription(); } + public DynamicTableSource createTableSource(DynamicTableFactory.Context context) { + return FactoryUtil.createDynamicTableSource( + null, + context.getObjectIdentifier(), + context.getCatalogTable(), + new HashMap<>(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } + public DynamicTableSink createTableSink(DynamicTableFactory.Context context) { return FactoryUtil.createDynamicTableSink( null, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/PaimonFormatTableTest.java similarity index 89% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/PaimonFormatTableTest.java index 2cf159cfb385..9f8dd075eb7b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableReadITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/PaimonFormatTableTest.java @@ -47,9 +47,10 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for format table. */ -public class FormatTableReadITCaseBase extends RESTCatalogITCaseBase { +public class PaimonFormatTableTest extends RESTCatalogITCaseBase { @Test public void testParquetFileFormat() throws IOException { @@ -69,7 +70,11 @@ public void testParquetFileFormat() throws IOException { datas[1] = GenericRow.of(2, 2, 2); String tableName = "format_table_test"; sql( - "CREATE TABLE %s (a INT, b INT, c INT) WITH ('file.format'='parquet', 'type'='format-table')", + "CREATE TABLE %s (a INT, b INT, c INT) WITH (" + + "'file.format'='parquet'," + + "'type'='format-table'," + + "'format-table.implementation'='paimon'" + + ")", tableName); write( factory, @@ -84,6 +89,8 @@ public void testParquetFileFormat() throws IOException { restCatalogServer.setDataToken(identifier, expiredDataToken); assertThat(sql("SELECT a FROM %s", tableName)) .containsExactlyInAnyOrder(Row.of(1), Row.of(2)); + assertThatThrownBy(() -> sql("INSERT INTO %s VALUES (3, 3, 3)", tableName)) + .isInstanceOf(RuntimeException.class); sql("Drop TABLE %s", tableName); }