diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
index 880fb2cf11d89..171e8fefad6e5 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
@@ -30,7 +30,7 @@
"description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])"
}, {
"id" : 0,
- "type" : "stream-exec-sink_1",
+ "type" : "stream-exec-sink_2",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index ea3925ab06a03..37dbfbb5db9b0 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -114,6 +114,21 @@ public String toString() {
return id;
}
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TableTestProgram that = (TableTestProgram) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
/**
* Entrypoint for a {@link TableTestProgram} that forces an identifier and description of the
* test program.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 411470f5dbc43..3733e7a24df17 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -1326,7 +1326,7 @@ private static void validateAndApplyTargetColumns(
*
*
The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS}
*/
- private static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) {
+ public static RowType createConsumedType(ResolvedSchema schema, DynamicTableSink sink) {
final Map metadataMap = extractMetadataMap(sink);
final Stream physicalFields =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index c9a6e8c3c632f..8a1dd04bcfe87 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -124,7 +124,7 @@ protected Transformation