Skip to content

Commit a451e39

Browse files
authored
[FLINK-38738][table] Fix UIDs for ScanTableSource with multiple transformations
This closes #27294.
1 parent c3ea9c2 commit a451e39

File tree

51 files changed

+1703
-153
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1703
-153
lines changed

flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"flinkVersion" : "",
33
"nodes" : [ {
44
"id" : 0,
5-
"type" : "stream-exec-table-source-scan_1",
5+
"type" : "stream-exec-table-source-scan_2",
66
"scanTableSource" : {
77
"table" : {
88
"identifier" : "`default_catalog`.`default_database`.`MyTable`",

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ProviderContext.java

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,72 @@ public interface ProviderContext {
3434

3535
/**
3636
* Generates a new unique identifier for a {@link Transformation}/{@code DataStream} operator.
37-
* The {@code name} must be unique within the provider implementation. The framework will make
38-
* sure that the name is unique for the entire topology.
37+
*
38+
* <p>UIDs are crucial for state management as they identify an operator in the topology. The
39+
* planner guarantees that all operators receive a stable UID. However, the planner does not
40+
* control the operators and transformations in sources and sinks. From a planner’s perspective,
41+
* sources and sinks are black boxes. Implementers must ensure that UIDs are assigned to all
42+
* operators.
43+
*
44+
* <p>The {@param name} argument must be unique within the provider implementation. The
45+
* framework will make sure that the name is unique for the entire topology.
46+
*
47+
* <p>For example, a connector that consists of two transformations (a source and a validator)
48+
* must use this method in the following way:
49+
*
50+
* <pre>{@code
51+
* var sourceOperator = ...
52+
* providerContext.generateUid("source").ifPresent(sourceOperator::uid);
53+
*
54+
* var validatorOperator = sourceOperator.process(...);
55+
* providerContext.generateUid("validator").ifPresent(validatorOperator::uid);
56+
* }</pre>
3957
*
4058
* <p>This method returns empty if an identifier cannot be generated, i.e., because the job is
41-
* in batch mode, or UIDs cannot be guaranteed to be unique.
59+
* in batch mode, or UIDs cannot be guaranteed to be unique because the topology is not created
60+
* from a compiled plan.
4261
*/
4362
Optional<String> generateUid(String name);
63+
64+
/**
65+
* Returns the display name provided by the framework to label the connector.
66+
*
67+
* <p>If multiple transformations are present, the implementer can decide which one is the
68+
* connector. The name will be shown in UI and logs.
69+
*
70+
* <p>For example, a connector that consists of two transformations (a source and a validator)
71+
* can use this method in the following way:
72+
*
73+
* <pre>{@code
74+
* var sourceOperator = ...
75+
* sourceOperator.name(providerContext.getName());
76+
*
77+
* var validatorOperator = sourceOperator.process(...);
78+
* validatorOperator.name("Validator for " + providerContext.getName());
79+
* }</pre>
80+
*/
81+
default String getName() {
82+
return "";
83+
}
84+
85+
/**
86+
* Returns the description provided by the framework to label the connector.
87+
*
88+
* <p>If multiple transformations are present, the implementer can decide which one is the
89+
* connector. The description will be shown in UI and logs.
90+
*
91+
* <p>For example, a connector that consists of two transformations (a source and a validator)
92+
* can use this method in the following way:
93+
*
94+
* <pre>{@code
95+
* var sourceOperator = ...
96+
* sourceOperator.setDescription(providerContext.getDescription());
97+
*
98+
* var validatorOperator = sourceOperator.process(...);
99+
* validatorOperator.setDescription("Validates non-null values.");
100+
* }</pre>
101+
*/
102+
default String getDescription() {
103+
return "";
104+
}
44105
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.planner.plan.nodes.exec;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.api.CompiledPlan;
2223
import org.apache.flink.table.data.RowData;
2324
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
2425
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
@@ -38,6 +39,9 @@
3839
/**
3940
* The representation of execution information for a {@link FlinkPhysicalRel}.
4041
*
42+
* <p>Note: Every new node should be annotated with {@link ExecNodeMetadata}. It enables bookkeeping
43+
* and affects how a {@link CompiledPlan} is derived from Java objects.
44+
*
4145
* @param <T> The type of the elements that result from this node.
4246
*/
4347
@JsonTypeInfo(
@@ -61,6 +65,16 @@ public interface ExecNode<T> extends ExecNodeTranslator<T>, FusionCodegenExecNod
6165
@JsonProperty(value = FIELD_NAME_ID, index = 0)
6266
int getId();
6367

68+
/**
69+
* The version of the node.
70+
*
71+
* <p>A new version can be added by declaring a {@link ExecNodeMetadata} annotation, potentially
72+
* by copying the old annotation. You can use this method to get the current compiled version
73+
* and execute version-specific logic accordingly.
74+
*/
75+
@JsonIgnore
76+
int getVersion();
77+
6478
/** Returns a string which describes this node. */
6579
@JsonProperty(value = FIELD_NAME_DESCRIPTION)
6680
String getDescription();

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
2727
import org.apache.flink.table.api.TableException;
2828
import org.apache.flink.table.api.config.ExecutionConfigOptions;
29+
import org.apache.flink.table.connector.ProviderContext;
2930
import org.apache.flink.table.delegation.Planner;
3031
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
3132
import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -48,6 +49,7 @@
4849

4950
import java.util.ArrayList;
5051
import java.util.List;
52+
import java.util.Optional;
5153

5254
import static org.apache.flink.util.Preconditions.checkArgument;
5355
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -129,6 +131,11 @@ public final int getId() {
129131
return context.getId();
130132
}
131133

134+
@Override
135+
public final int getVersion() {
136+
return context.getVersion();
137+
}
138+
132139
@Override
133140
public String getDescription() {
134141
return description;
@@ -325,4 +332,28 @@ protected OpFusionCodegenSpecGenerator translateToFusionCodegenSpecInternal(
325332
PlannerBase planner, ExecNodeConfig config, CodeGeneratorContext parentCtx) {
326333
throw new TableException("This ExecNode doesn't support operator fusion codegen now.");
327334
}
335+
336+
/** Context for connectors to configure transformations and operators. */
337+
protected ProviderContext createProviderContext(
338+
TransformationMetadata metadata, ExecNodeConfig config) {
339+
return new ProviderContext() {
340+
@Override
341+
public Optional<String> generateUid(String name) {
342+
if (config.shouldSetUid()) {
343+
return Optional.of(createTransformationUid(name, config));
344+
}
345+
return Optional.empty();
346+
}
347+
348+
@Override
349+
public String getName() {
350+
return metadata.getName();
351+
}
352+
353+
@Override
354+
public String getDescription() {
355+
return metadata.getDescription();
356+
}
357+
};
358+
}
328359
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.FlinkVersion;
2222
import org.apache.flink.annotation.Internal;
2323
import org.apache.flink.api.dag.Transformation;
24+
import org.apache.flink.table.api.CompiledPlan;
2425
import org.apache.flink.table.api.config.ExecutionConfigOptions;
2526
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
2627

@@ -32,18 +33,18 @@
3233
import java.lang.annotation.Target;
3334

3435
/**
35-
* Annotation to be used for {@link ExecNode}s to keep necessary metadata when
36-
* serializing/deserializing them in a plan. It's used for internal bookkeeping across Flink
37-
* versions, and to provide necessary information to the testing infrastructure.
36+
* Annotation to be used by classes implementing the {@link ExecNode} interface.
3837
*
39-
* <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
40-
* can be correctly serialized and later on instantiated from a string (JSON) plan.
38+
* <p>The {@link ExecNodeMetadata} annotation contains crucial information for (de)serializing a
39+
* node into a {@link CompiledPlan}. It's used for internal bookkeeping across Flink versions and
40+
* powers testing infrastructure.
4141
*
42-
* <p>It's possible for one {@link ExecNode} class to use multiple annotations to denote ability to
43-
* upgrade to more versions. an {@link ExecNode} class can be annotated directly with multiple
44-
* {@link ExecNodeMetadata} annotations, or with a single {@link MultipleExecNodeMetadata}
45-
* annotation where the {@link MultipleExecNodeMetadata#value()} is an array of {@link
46-
* ExecNodeMetadata} annotations.
42+
* <p>Each new {@link ExecNode} version needs an annotation. Multiple annotations on top of a single
43+
* {@link ExecNode} class are supported and declare multiple versions. When compiling a node into
44+
* JSON, the most recent version is persisted. When restoring from JSON, the original version is
45+
* available via {@link ExecNode#getVersion()}. Parts of the original configuration (see {@link
46+
* #consumedOptions()}) together with recent session configuration will be made available via {@link
47+
* ExecNodeConfig}.
4748
*/
4849
@Documented
4950
@Target(ElementType.TYPE)

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public String getDynamicFilteringDataListenerID() {
124124
protected Transformation<RowData> translateToPlanInternal(
125125
PlannerBase planner, ExecNodeConfig config) {
126126
final Transformation<RowData> transformation =
127-
super.translateToPlanInternal(planner, config);
127+
super.createTransformation(planner, config, false);
128128
// the boundedness has been checked via the runtime provider already, so we can safely
129129
// declare all legacy transformations as bounded to make the stream graph generator happy
130130
ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -398,19 +398,21 @@ private Transformation<?> applySinkProvider(
398398
ExecNodeConfig config,
399399
ClassLoader classLoader) {
400400
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
401-
402-
TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);
401+
final TransformationMetadata metadata =
402+
createTransformationMeta(SINK_TRANSFORMATION, config);
403403
if (runtimeProvider instanceof DataStreamSinkProvider) {
404404
Transformation<RowData> sinkTransformation =
405405
applyRowtimeTransformation(
406406
inputTransform, rowtimeFieldIndex, sinkParallelism, config);
407407
final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation);
408408
final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
409-
return provider.consumeDataStream(createProviderContext(config), dataStream)
409+
return provider.consumeDataStream(
410+
createProviderContext(metadata, config), dataStream)
410411
.getTransformation();
411412
} else if (runtimeProvider instanceof TransformationSinkProvider) {
412413
final TransformationSinkProvider provider =
413414
(TransformationSinkProvider) runtimeProvider;
415+
final ProviderContext providerContext = createProviderContext(metadata, config);
414416
return provider.createTransformation(
415417
new TransformationSinkProvider.Context() {
416418
@Override
@@ -425,7 +427,17 @@ public int getRowtimeIndex() {
425427

426428
@Override
427429
public Optional<String> generateUid(String name) {
428-
return createProviderContext(config).generateUid(name);
430+
return providerContext.generateUid(name);
431+
}
432+
433+
@Override
434+
public String getName() {
435+
return providerContext.getName();
436+
}
437+
438+
@Override
439+
public String getDescription() {
440+
return providerContext.getDescription();
429441
}
430442
});
431443
} else if (runtimeProvider instanceof SinkFunctionProvider) {
@@ -436,7 +448,7 @@ public Optional<String> generateUid(String name) {
436448
env,
437449
inputTransform,
438450
rowtimeFieldIndex,
439-
sinkMeta,
451+
metadata,
440452
sinkParallelism);
441453
} else if (runtimeProvider instanceof OutputFormatProvider) {
442454
OutputFormat<RowData> outputFormat =
@@ -448,7 +460,7 @@ public Optional<String> generateUid(String name) {
448460
env,
449461
inputTransform,
450462
rowtimeFieldIndex,
451-
sinkMeta,
463+
metadata,
452464
sinkParallelism);
453465
} else if (runtimeProvider instanceof SinkV2Provider) {
454466
SinkV2Provider sinkV2Provider = (SinkV2Provider) runtimeProvider;
@@ -466,23 +478,14 @@ public Optional<String> generateUid(String name) {
466478
.getAdditionalMetricVariables()
467479
.forEach(transformation::addMetricVariable);
468480
transformation.setParallelism(sinkParallelism, sinkParallelismConfigured);
469-
sinkMeta.fill(transformation);
481+
metadata.fill(transformation);
470482
return transformation;
471483
} else {
472484
throw new TableException("Unsupported sink runtime provider.");
473485
}
474486
}
475487
}
476488

477-
private ProviderContext createProviderContext(ExecNodeConfig config) {
478-
return name -> {
479-
if (config.shouldSetUid()) {
480-
return Optional.of(createTransformationUid(name, config));
481-
}
482-
return Optional.empty();
483-
};
484-
}
485-
486489
private Transformation<?> createSinkFunctionTransformation(
487490
SinkFunction<RowData> sinkFunction,
488491
StreamExecutionEnvironment env,

0 commit comments

Comments
 (0)