diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out index 880fb2cf11d89..171e8fefad6e5 100644 --- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out +++ b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out @@ -30,7 +30,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" }, { "id" : 0, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java index ea3925ab06a03..37dbfbb5db9b0 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java @@ -114,6 +114,21 @@ public String toString() { return id; } + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + TableTestProgram that = (TableTestProgram) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + /** * Entrypoint for a {@link TableTestProgram} that forces an identifier and description of the * test program. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 411470f5dbc43..3733e7a24df17 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -1326,7 +1326,7 @@ private static void validateAndApplyTargetColumns( * *

The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS} */ - private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) { + public static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) { final Map metadataMap = extractMetadataMap(sink); final Stream physicalFields = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index c9a6e8c3c632f..8a1dd04bcfe87 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -124,7 +124,7 @@ protected Transformation translateToPlanInternal( } @Override - protected RowType getPhysicalRowType(ResolvedSchema schema) { + protected final RowType getPersistedRowType(ResolvedSchema schema, DynamicTableSink tableSink) { // row-level modification may only write partial columns, // so we try to prune the RowType to get the real RowType containing // the physical columns to be written @@ -132,16 +132,20 @@ protected RowType getPhysicalRowType(ResolvedSchema schema) { for (SinkAbilitySpec sinkAbilitySpec : tableSinkSpec.getSinkAbilities()) { if (sinkAbilitySpec instanceof RowLevelUpdateSpec) { RowLevelUpdateSpec rowLevelUpdateSpec = (RowLevelUpdateSpec) sinkAbilitySpec; - return getPhysicalRowType( - schema, rowLevelUpdateSpec.getRequiredPhysicalColumnIndices()); + return getPersistedRowType( + schema, + rowLevelUpdateSpec.getRequiredPhysicalColumnIndices(), + tableSink); } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) { RowLevelDeleteSpec rowLevelDeleteSpec = (RowLevelDeleteSpec) sinkAbilitySpec; - return getPhysicalRowType( - schema, rowLevelDeleteSpec.getRequiredPhysicalColumnIndices()); + return getPersistedRowType( + schema, + rowLevelDeleteSpec.getRequiredPhysicalColumnIndices(), + tableSink); } } } - return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + return super.getPersistedRowType(schema, tableSink); } @Override @@ -183,12 +187,18 @@ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) } /** Get the physical row type with given column indices. */ - private RowType getPhysicalRowType(ResolvedSchema schema, int[] columnIndices) { + private RowType getPersistedRowType( + ResolvedSchema schema, int[] columnIndices, DynamicTableSink sink) { List columns = schema.getColumns(); List requireColumns = new ArrayList<>(); for (int columnIndex : columnIndices) { requireColumns.add(columns.get(columnIndex)); } - return (RowType) ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType(); + return super.getPersistedRowType(ResolvedSchema.of(requireColumns), sink); + } + + @Override + protected final boolean legacyPhysicalTypeEnabled() { + return false; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java index c14d9988bf681..6dda719322447 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java @@ -124,7 +124,7 @@ public String getDynamicFilteringDataListenerID() { protected Transformation translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { final Transformation transformation = - super.createTransformation(planner, config, false); + super.translateToPlanInternal(planner, config); // the boundedness has been checked via the runtime provider already, so we can safely // declare all legacy transformations as bounded to make the stream graph generator happy ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation); @@ -164,6 +164,11 @@ public Transformation createInputFormatTransformation( return env.addSource(function, operatorName, outputTypeInfo).getTransformation(); } + @Override + protected final boolean legacyUidsEnabled() { + return false; + } + public BatchExecTableSourceScan copyAndRemoveInputs() { BatchExecTableSourceScan tableSourceScan = new BatchExecTableSourceScan( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 86866a985597e..ad333589232b1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -52,6 +52,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate; import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.connectors.DynamicSinkUtils; import org.apache.flink.table.planner.lineage.TableLineageUtils; import org.apache.flink.table.planner.lineage.TableSinkLineageVertex; import org.apache.flink.table.planner.lineage.TableSinkLineageVertexImpl; @@ -151,8 +152,8 @@ protected Transformation createSinkTransformation( tableSink.getSinkRuntimeProvider( new SinkRuntimeProviderContext( isBounded, tableSinkSpec.getTargetColumns())); - final RowType physicalRowType = getPhysicalRowType(schema); - final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema); + final RowType persistedRowType = getPersistedRowType(schema, tableSink); + final int[] primaryKeys = getPrimaryKeyIndices(persistedRowType, schema); final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider); sinkParallelismConfigured = isParallelismConfigured(runtimeProvider); final int inputParallelism = inputTransform.getParallelism(); @@ -190,7 +191,7 @@ protected Transformation createSinkTransformation( final boolean needMaterialization = !inputInsertOnly && upsertMaterialize; Transformation sinkTransform = - applyConstraintValidations(inputTransform, config, physicalRowType); + applyConstraintValidations(inputTransform, config, persistedRowType); if (hasPk) { sinkTransform = @@ -212,7 +213,7 @@ protected Transformation createSinkTransformation( sinkParallelism, config, classLoader, - physicalRowType, + persistedRowType, inputUpsertKey); } @@ -545,8 +546,16 @@ protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) .orElse(new int[0]); } - protected RowType getPhysicalRowType(ResolvedSchema schema) { - return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + /** + * The method recreates the type of the incoming record from the sink's schema. It puts the + * physical columns first, followed by persisted metadata columns. + */ + protected RowType getPersistedRowType(ResolvedSchema schema, DynamicTableSink sink) { + if (legacyPhysicalTypeEnabled()) { + return (RowType) schema.toPhysicalRowDataType().getLogicalType(); + } else { + return DynamicSinkUtils.createConsumedType(schema, sink); + } } /** @@ -574,4 +583,6 @@ private Optional getTargetRowKind() { } return Optional.empty(); } + + protected abstract boolean legacyPhysicalTypeEnabled(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java index 460530c49cd63..1d7526aba7733 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java @@ -112,8 +112,9 @@ public DynamicTableSourceSpec getTableSourceSpec() { return tableSourceSpec; } - protected Transformation createTransformation( - PlannerBase planner, ExecNodeConfig config, boolean legacyUidsEnabled) { + @Override + protected Transformation translateToPlanInternal( + PlannerBase planner, ExecNodeConfig config) { final Transformation sourceTransform; final StreamExecutionEnvironment env = planner.getExecEnv(); final TransformationMetadata metadata = @@ -186,7 +187,7 @@ protected Transformation createTransformation( ((DataStreamScanProvider) provider) .produceDataStream(createProviderContext(metadata, config), env) .getTransformation(); - if (legacyUidsEnabled) { + if (legacyUidsEnabled()) { metadata.fill(sourceTransform); } sourceTransform.setOutputType(outputTypeInfo); @@ -194,7 +195,7 @@ protected Transformation createTransformation( sourceTransform = ((TransformationScanProvider) provider) .createTransformation(createProviderContext(metadata, config)); - if (legacyUidsEnabled) { + if (legacyUidsEnabled()) { metadata.fill(sourceTransform); } sourceTransform.setOutputType(outputTypeInfo); @@ -369,4 +370,6 @@ protected abstract Transformation createInputFormatTransformation( InputFormat inputFormat, InternalTypeInfo outputTypeInfo, String operatorName); + + protected abstract boolean legacyUidsEnabled(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index e1052a965337b..c6947403d31a9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -104,6 +104,30 @@ }, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) +// Version 2: Fixed the data type used for creating constraint enforcer and sink upsert +// materializer. Since this version the sink works correctly with persisted metadata columns. +// We introduced a new version, because statements that were never rolling back to a value from +// state could run succesfully. We allow those jobs to be upgraded. Without a new versions such jobs +// would fail on restore, because the state serializer would differ +@ExecNodeMetadata( + name = "stream-exec-sink", + version = 2, + consumedOptions = { + "table.exec.sink.not-null-enforcer", + "table.exec.sink.type-length-enforcer", + "table.exec.sink.upsert-materialize", + "table.exec.sink.keyed-shuffle", + "table.exec.sink.rowtime-inserter" + }, + producedTransformations = { + CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, + CommonExecSink.PARTITIONER_TRANSFORMATION, + CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, + CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, + CommonExecSink.SINK_TRANSFORMATION + }, + minPlanVersion = FlinkVersion.v2_3, + minStateVersion = FlinkVersion.v2_3) public class StreamExecSink extends CommonExecSink implements StreamExecNode { private static final Logger LOG = LoggerFactory.getLogger(StreamExecSink.class); @@ -391,4 +415,9 @@ private static SequencedMultiSetStateConfig createStateConfig( throw new IllegalArgumentException("Unsupported strategy: " + strategy); } } + + @Override + protected final boolean legacyPhysicalTypeEnabled() { + return getVersion() == 1; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java index 96d21f120224f..412b9a42bb630 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java @@ -25,9 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan; @@ -94,12 +92,6 @@ public StreamExecTableSourceScan( description); } - @Override - protected Transformation translateToPlanInternal( - PlannerBase planner, ExecNodeConfig config) { - return createTransformation(planner, config, legacyUidsEnabled()); - } - @Override public Transformation createInputFormatTransformation( StreamExecutionEnvironment env, @@ -111,7 +103,8 @@ public Transformation createInputFormatTransformation( return env.createInput(inputFormat, outputTypeInfo).name(operatorName).getTransformation(); } - private boolean legacyUidsEnabled() { + @Override + protected final boolean legacyUidsEnabled() { return getVersion() == 1; } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java index 007bfef473434..49609c1d439ea 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java @@ -188,8 +188,8 @@ void testUidFlink1_18() throws IOException { config.set(TABLE_EXEC_UID_FORMAT, "___"), json -> {}, env -> planFromCurrentFlinkValues(env).asJsonString(), - "\\d+_stream-exec-sink_1_sink", - "\\d+_stream-exec-sink_1_constraint-validator", + "\\d+_stream-exec-sink_2_sink", + "\\d+_stream-exec-sink_2_constraint-validator", "\\d+_stream-exec-values_1_values"); } @@ -200,7 +200,7 @@ void testPerNodeCustomUid() throws IOException { json -> JsonTestUtils.setExecNodeConfig( json, - "stream-exec-sink_1", + "stream-exec-sink_2", TABLE_EXEC_UID_FORMAT.key(), "my_custom__"), env -> planFromCurrentFlinkValues(env).asJsonString(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java index 7939a672415b4..b1248a2bb1b46 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.common; +import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; import org.apache.flink.table.test.program.SinkTestStep; @@ -28,6 +29,9 @@ import java.util.Arrays; +import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY; + /** {@link TableTestProgram} definitions for testing {@link StreamExecSink}. */ public class TableSinkTestPrograms { @@ -222,4 +226,75 @@ private static TableTestProgram buildBucketingTest( .build()) .runSql("INSERT INTO sink_t SELECT * FROM source_t") .build(); + + // The queries could run as long as the value was never rolled back to one from state, which is + // possible. We validate those can restore and still run + public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE = + getInsertRetractWithWritableMetadata(true); + + public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA = + getInsertRetractWithWritableMetadata(false); + + private static TableTestProgram getInsertRetractWithWritableMetadata( + boolean forLegacyPhysicalType) { + final Row producedAfterRestore; + final String consumedAfterRestore; + if (forLegacyPhysicalType) { + producedAfterRestore = Row.ofKind(RowKind.INSERT, "Bob", 7); + consumedAfterRestore = "+U[BOB, 7, Bob, 7]"; + } else { + // retract the last record, which should roll back to + // the previous state + producedAfterRestore = Row.ofKind(RowKind.DELETE, "Bob", 6); + consumedAfterRestore = "+U[BOB, 5, Bob, 5]"; + } + return TableTestProgram.of( + "insert-into-upsert-with-sink-upsert-materializer-writable-metadata" + + (forLegacyPhysicalType ? "-v1" : ""), + "The query requires a sink upsert materializer and the sink" + + " uses writable metadata columns. The scenario showcases a" + + " bug where a wrong type was used in sinks which did not" + + " consider metadata columns. There needs to be multiple" + + " requirements for the bug to show up. 1. We need to use " + + " rocksdb, so that we use a serializer when putting records" + + " into state in SinkUpsertMaterializer. 2. We need to retract" + + " to a previous value taken from the state, otherwise we" + + " forward the incoming record. 3. There need to be persisted" + + " metadata columns.") + .setupConfig( + TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY, + ExecutionConfigOptions.SinkUpsertMaterializeStrategy.LEGACY) + .setupConfig(STATE_BACKEND, "rocksdb") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("name STRING", "score INT") + .addOption("changelog-mode", "I,UB,UA,D") + .producedBeforeRestore( + Row.ofKind(RowKind.INSERT, "Bob", 5), + Row.ofKind(RowKind.INSERT, "Bob", 6)) + .producedAfterRestore(producedAfterRestore) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", + "scoreMetadata BIGINT METADATA", + "score BIGINT", + "nameMetadata STRING METADATA") + .addOption("sink-changelog-mode-enforced", "I,UA,D") + // The test sink lists metadata columns + // (SupportsWritingMetadata#listWritableMetadata) in + // alphabetical order, this is also the order in the record of + // a sink, irrespective of the table schema + .addOption( + "writable-metadata", + "nameMetadata:STRING,scoreMetadata:BIGINT") + // physical columns first, then metadata columns, sorted + // alphabetically by columns name (test sink property) + .consumedBeforeRestore("+I[BOB, 5, Bob, 5]", "+U[BOB, 6, Bob, 6]") + .consumedAfterRestore(consumedAfterRestore) + .build()) + .runSql("INSERT INTO sink_t SELECT UPPER(name), score, score, name FROM source_t") + .build(); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java index 1ea0180371704..2e7197525f7c1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; /** Restore tests for {@link StreamExecSink}. */ public class TableSinkRestoreTest extends RestoreTestBase { @@ -44,6 +45,16 @@ public List programs() { TableSinkTestPrograms.SINK_WRITING_METADATA, TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY, TableSinkTestPrograms.SINK_PARTIAL_INSERT, - TableSinkTestPrograms.SINK_UPSERT); + TableSinkTestPrograms.SINK_UPSERT, + TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE, + TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA); + } + + @Override + protected Map> programsToIgnore() { + return Map.of( + // disable the writable metadata test for sink node with version 1. it fails after + // the restore + 1, List.of(TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA)); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java index 13f91ef9a5270..f6ec45fb5a069 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java @@ -205,6 +205,18 @@ private Stream createSpecs() { savepointPath)))); } + // ==================================================================================== + // Extension points for adjusting test combinations + // ==================================================================================== + + /** + * Can be overridden with a collection of programs that should be ignored for a particular + * version of the node under test. + */ + protected Map> programsToIgnore() { + return Collections.emptyMap(); + } + /** * The method can be overridden in a subclass to test multiple savepoint files for a given * program and a node in a particular version. This can be useful e.g. to test a node against @@ -212,9 +224,16 @@ private Stream createSpecs() { */ protected Stream getSavepointPaths( TableTestProgram program, ExecNodeMetadata metadata) { - return Stream.of(getSavepointPath(program, metadata, null)); + if (programsToIgnore() + .getOrDefault(metadata.version(), Collections.emptyList()) + .contains(program)) { + return Stream.empty(); + } else { + return Stream.of(getSavepointPath(program, metadata, null)); + } } + /** Can be used in {@link #getSavepointPaths(TableTestProgram, ExecNodeMetadata)}. */ protected final String getSavepointPath( TableTestProgram program, ExecNodeMetadata metadata, @@ -229,6 +248,10 @@ protected final String getSavepointPath( return builder.toString(); } + // ==================================================================================== + // End of extension points + // ==================================================================================== + private void registerSinkObserver( final List> futures, final SinkTestStep sinkTestStep, diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out index 19cd2a29e6c94..176a1eb36b4b1 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out @@ -30,7 +30,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" }, { "id" : 2, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json index 957899f9eb12c..f04ebbd701db7 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json @@ -129,7 +129,7 @@ }, { "id": 5, - "type": "stream-exec-sink_1", + "type": "stream-exec-sink_2", "configuration": { "table.exec.sink.keyed-shuffle": "AUTO", "table.exec.sink.not-null-enforcer": "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out index 5fa8f04dbdac1..6018fa8f08159 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out @@ -30,7 +30,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])" }, { "id" : 0, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out index 4080cb14712e0..4b1ddd9ad3e57 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out @@ -31,7 +31,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.parallelism=2, bounded=true}]]])" }, { "id" : 0, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out index 9ebcad236f8e9..989225dbed247 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out @@ -72,7 +72,7 @@ "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])" }, { "id" : 3, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out index c3169f69f9dc5..11a1628fbcadb 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out @@ -141,7 +141,7 @@ "description" : "Calc(select=[a, b], where=[f0])" }, { "id" : 5, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out index 195be418c0d79..71ecfdb9335fc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out @@ -189,7 +189,7 @@ "description" : "Calc(select=[x, y], where=[(((y + 1) = (y * y)) AND (x = a))])" }, { "id" : 5, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out index 73f729726343d..3d3d35aaff272 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out @@ -135,7 +135,7 @@ "description" : "Calc(select=[x, y])" }, { "id" : 5, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out index 77a97c2d42aad..fd7538d29914e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out @@ -149,7 +149,7 @@ "description" : "Calc(select=[CAST(b AS BIGINT) AS a, EXPR$1 AS b])" }, { "id" : 6, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out index c788c49e306d9..711d77a981011 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out @@ -303,7 +303,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out index 892f3eb3b9a30..80528083d3194 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out @@ -301,7 +301,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, rowtime, 10000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out index 1bde1c99e43d5..625404e3afcf2 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -413,7 +413,7 @@ "description" : "Calc(select=[b, w$start AS window_start, w$end AS window_end, EXPR$3])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out index 66562e9f1f202..e9092e8a39560 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out @@ -356,7 +356,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, proctime, 600000, 300000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out index 11ee30655dde0..ca4a4cbd1a8bf 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out @@ -354,7 +354,7 @@ "description" : "PythonGroupWindowAggregate(groupBy=[b], window=[SessionGroupWindow('w$, proctime, 600000)], select=[b, pyFunc(a, $f3) AS EXPR$1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out index 28630581efe18..1be4cb3d7a4ca 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out @@ -445,7 +445,7 @@ "description" : "Calc(select=[b, w$end AS window_end, EXPR$2])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out index 260321275c339..44dae4c0af927 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out @@ -352,7 +352,7 @@ "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out index 5cad90f407c5f..eba924fca5e84 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out @@ -366,7 +366,7 @@ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out index afc22d4037ac4..1d7a1d15ce964 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out @@ -303,7 +303,7 @@ "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out index b792d066f59b6..3252699022cbe 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out @@ -356,7 +356,7 @@ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" }, { "id" : 8, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out index 1f25a552ba377..d44f5a0aac485 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out @@ -298,7 +298,7 @@ "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])" }, { "id" : 7, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out index cc6ba9c8b6e4e..b3db697e6ab4a 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out @@ -263,7 +263,7 @@ } }, { "id" : 6, - "type" : "stream-exec-sink_1", + "type" : "stream-exec-sink_2", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", "table.exec.sink.not-null-enforcer" : "ERROR", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json new file mode 100644 index 0000000000000..2959e98ad25a2 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json @@ -0,0 +1,149 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + } ] + } + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, score])" + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])" + }, { + "id" : 3, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "scoreMetadata", + "kind" : "METADATA", + "dataType" : "BIGINT", + "isVirtual" : false + }, { + "name" : "score", + "dataType" : "BIGINT" + }, { + "name" : "nameMetadata", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "primaryKey" : { + "name" : "PK_name", + "type" : "PRIMARY_KEY", + "columns" : [ "name" ] + } + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "nameMetadata", "scoreMetadata" ], + "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "requireUpsertMaterialize" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "sinkMaterializeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata new file mode 100644 index 0000000000000..52dbca2634cea Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json new file mode 100644 index 0000000000000..0274397826207 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json @@ -0,0 +1,149 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + } ] + } + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, score])" + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])" + }, { + "id" : 3, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "scoreMetadata", + "kind" : "METADATA", + "dataType" : "BIGINT", + "isVirtual" : false + }, { + "name" : "score", + "dataType" : "BIGINT" + }, { + "name" : "nameMetadata", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "primaryKey" : { + "name" : "PK_name", + "type" : "PRIMARY_KEY", + "columns" : [ "name" ] + } + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "nameMetadata", "scoreMetadata" ], + "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "requireUpsertMaterialize" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "sinkMaterializeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata new file mode 100644 index 0000000000000..be3b51c2def8f Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json new file mode 100644 index 0000000000000..43168424eec61 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json @@ -0,0 +1,149 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 4, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + } ] + } + } + } + }, + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[name, score])" + }, { + "id" : 5, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "CALL", + "internalName" : "$UPPER$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$CAST$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "BIGINT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])" + }, { + "id" : 6, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "scoreMetadata", + "kind" : "METADATA", + "dataType" : "BIGINT", + "isVirtual" : false + }, { + "name" : "score", + "dataType" : "BIGINT" + }, { + "name" : "nameMetadata", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "primaryKey" : { + "name" : "PK_name", + "type" : "PRIMARY_KEY", + "columns" : [ "name" ] + } + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "nameMetadata", "scoreMetadata" ], + "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "requireUpsertMaterialize" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "sinkMaterializeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])" + } ], + "edges" : [ { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata new file mode 100644 index 0000000000000..b2c30bf6e5028 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json new file mode 100644 index 0000000000000..338e923a20e2c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json @@ -0,0 +1,85 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 5, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 6, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + }, + "distribution" : { + "kind" : "HASH", + "bucketCount" : 3, + "bucketKeys" : [ "a" ] + }, + "partitionKeys" : [ "b" ] + } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata new file mode 100644 index 0000000000000..6998265c31e5c Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json new file mode 100644 index 0000000000000..47ce499372a0b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json @@ -0,0 +1,84 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 7, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 8, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + }, + "distribution" : { + "kind" : "HASH", + "bucketKeys" : [ "a" ] + }, + "partitionKeys" : [ "b" ] + } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata new file mode 100644 index 0000000000000..0fd79331c10f9 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json new file mode 100644 index 0000000000000..1df0a88d5f2d7 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json @@ -0,0 +1,85 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 2, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + }, + "distribution" : { + "kind" : "UNKNOWN", + "bucketCount" : 3, + "bucketKeys" : [ ] + }, + "partitionKeys" : [ "b" ] + } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata new file mode 100644 index 0000000000000..ef824265bbbd0 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json new file mode 100644 index 0000000000000..9e95055e3ca54 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json @@ -0,0 +1,85 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 3, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 4, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + }, + "distribution" : { + "kind" : "UNKNOWN", + "bucketCount" : 3, + "bucketKeys" : [ "a" ] + }, + "partitionKeys" : [ "b" ] + } + }, + "abilities" : [ { + "type" : "Bucketing" + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata new file mode 100644 index 0000000000000..023fa406bf3ff Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json new file mode 100644 index 0000000000000..192ef2898a9f4 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json @@ -0,0 +1,119 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 16, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 17, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "catalogName" : "`default_catalog`.`default_database`.`ndf`", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, b, ndf(c) AS EXPR$2])" + }, { + "id" : 18, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647) NOT NULL" + } ], + "primaryKey" : { + "name" : "PK_c", + "type" : "PRIMARY_KEY", + "columns" : [ "c" ] + } + } + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, EXPR$2])" + } ], + "edges" : [ { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata new file mode 100644 index 0000000000000..fb6418009ee31 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json new file mode 100644 index 0000000000000..1be7ade9d61aa --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json @@ -0,0 +1,80 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 12, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 13, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + }, + "abilities" : [ { + "type" : "Overwrite", + "overwrite" : true + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata new file mode 100644 index 0000000000000..874462a05b31e Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json new file mode 100644 index 0000000000000..ddd0df467ae1b --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json @@ -0,0 +1,128 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 19, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 20, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "DOUBLE" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` DECIMAL(10, 2), `EXPR$4` DOUBLE>", + "description" : "Calc(select=[a, b, c, null:DECIMAL(10, 2) AS EXPR$3, null:DOUBLE AS EXPR$4])" + }, { + "id" : 21, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "e", + "dataType" : "DOUBLE" + } ] + } + } + }, + "abilities" : [ { + "type" : "TargetColumnWriting", + "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ] + } ], + "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` DECIMAL(10, 2), `EXPR$4` DOUBLE>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], targetColumns=[[0],[1],[2]], fields=[a, b, c, EXPR$3, EXPR$4])" + } ], + "edges" : [ { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata new file mode 100644 index 0000000000000..5b02515c83364 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json new file mode 100644 index 0000000000000..79f725b5f5ccd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json @@ -0,0 +1,123 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 9, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 10, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 2, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, 2 AS EXPR$1, b, c])" + }, { + "id" : 11, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "p", + "dataType" : "BIGINT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + }, + "partitionKeys" : [ "b" ] + } + }, + "abilities" : [ { + "type" : "Partitioning", + "partition" : { + "b" : "2" + } + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, EXPR$1, b, c])" + } ], + "edges" : [ { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata new file mode 100644 index 0000000000000..32a8e76dbe0f4 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json new file mode 100644 index 0000000000000..53aa33b7e1031 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json @@ -0,0 +1,87 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 22, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT NOT NULL" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + } + } + } + }, + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 23, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT NOT NULL" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + } + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ], + "upsertMaterializeStrategy" : "MAP", + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 22, + "target" : 23, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata new file mode 100644 index 0000000000000..e9567fea0c255 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json new file mode 100644 index 0000000000000..405453d3f43f8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json @@ -0,0 +1,83 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 14, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ] + } + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])" + }, { + "id" : 15, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ] + } + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "c" ], + "consumedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "MAP", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata new file mode 100644 index 0000000000000..75224ac197cd2 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata differ