Skip to content

Commit fdc3395

Browse files
authored
[FLINK-37618][table-planner] Fix PTFs INTERVAL argument
This closes #26410.
1 parent abcfebb commit fdc3395

File tree

7 files changed

+134
-0
lines changed

7 files changed

+134
-0
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.types.logical.DateType;
23+
import org.apache.flink.table.types.logical.DayTimeIntervalType;
2324
import org.apache.flink.table.types.logical.DistinctType;
2425
import org.apache.flink.table.types.logical.LogicalType;
2526
import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -28,6 +29,7 @@
2829
import org.apache.flink.table.types.logical.StructuredType;
2930
import org.apache.flink.table.types.logical.VarBinaryType;
3031
import org.apache.flink.table.types.logical.VarCharType;
32+
import org.apache.flink.table.types.logical.YearMonthIntervalType;
3133

3234
import java.util.Arrays;
3335
import java.util.HashMap;
@@ -74,7 +76,10 @@
7476
import static org.apache.flink.table.types.logical.LogicalTypeRoot.TINYINT;
7577
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARBINARY;
7678
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
79+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getDayPrecision;
80+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFractionalPrecision;
7781
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
82+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getYearPrecision;
7883
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isSingleFieldInterval;
7984

8085
/**
@@ -538,6 +543,31 @@ private CastAvoidanceChecker(LogicalType sourceType) {
538543
this.sourceType = sourceType;
539544
}
540545

546+
@Override
547+
public Boolean visit(YearMonthIntervalType targetType) {
548+
if (sourceType.isNullable() && !targetType.isNullable()) {
549+
return false;
550+
}
551+
if (sourceType.is(LogicalTypeRoot.INTERVAL_YEAR_MONTH)
552+
&& getYearPrecision(sourceType) <= targetType.getYearPrecision()) {
553+
return true;
554+
}
555+
return defaultMethod(targetType);
556+
}
557+
558+
@Override
559+
public Boolean visit(DayTimeIntervalType targetType) {
560+
if (sourceType.isNullable() && !targetType.isNullable()) {
561+
return false;
562+
}
563+
if (sourceType.is(LogicalTypeRoot.INTERVAL_DAY_TIME)
564+
&& getDayPrecision(sourceType) <= targetType.getDayPrecision()
565+
&& getFractionalPrecision(sourceType) <= targetType.getFractionalPrecision()) {
566+
return true;
567+
}
568+
return defaultMethod(targetType);
569+
}
570+
541571
@Override
542572
public Boolean visit(VarCharType targetType) {
543573
if (sourceType.isNullable() && !targetType.isNullable()) {

flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastAvoidanceTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,24 @@ private static Stream<Arguments> testData() {
100100
new YearMonthIntervalType(
101101
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 2),
102102
new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.MONTH),
103+
true),
104+
of(
105+
new YearMonthIntervalType(
106+
YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH, 4),
107+
new YearMonthIntervalType(
108+
YearMonthIntervalType.YearMonthResolution.MONTH, 2),
103109
false),
104110
of(
105111
new DayTimeIntervalType(
106112
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
107113
new DayTimeIntervalType(
108114
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
115+
true),
116+
of(
117+
new DayTimeIntervalType(
118+
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 7),
119+
new DayTimeIntervalType(
120+
DayTimeIntervalType.DayTimeResolution.DAY_TO_SECOND, 2, 6),
109121
false),
110122
of(new ArrayType(new TimestampType()), new ArrayType(new SmallIntType()), false),
111123
of(

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public List<TableTestProgram> programs() {
5656
ProcessTableFunctionTestPrograms.PROCESS_TYPED_SET_SEMANTIC_TABLE,
5757
ProcessTableFunctionTestPrograms.PROCESS_TYPED_SET_SEMANTIC_TABLE_TABLE_API,
5858
ProcessTableFunctionTestPrograms.PROCESS_POJO_ARGS,
59+
ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_DAY_ARGS,
60+
ProcessTableFunctionTestPrograms.PROCESS_INTERVAL_YEAR_ARGS,
5961
ProcessTableFunctionTestPrograms.PROCESS_EMPTY_ARGS,
6062
ProcessTableFunctionTestPrograms.PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH,
6163
ProcessTableFunctionTestPrograms.PROCESS_SET_SEMANTIC_TABLE_PASS_THROUGH,

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ContextFunction;
2929
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
3030
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
31+
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction;
32+
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction;
3133
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidPassThroughTimersFunction;
3234
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidRowKindFunction;
3335
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidRowSemanticTableTimersFunction;
@@ -385,6 +387,30 @@ public class ProcessTableFunctionTestPrograms {
385387
.runSql("INSERT INTO sink SELECT * FROM f()")
386388
.build();
387389

390+
public static final TableTestProgram PROCESS_INTERVAL_DAY_ARGS =
391+
TableTestProgram.of("process-interval-day-args", "interval argument")
392+
.setupTemporarySystemFunction("f", IntervalDayArgFunction.class)
393+
.setupSql(BASIC_VALUES)
394+
.setupTableSink(
395+
SinkTestStep.newBuilder("sink")
396+
.addSchema(BASE_SINK_SCHEMA)
397+
.consumedValues("+I[{PT1S}]")
398+
.build())
399+
.runSql("INSERT INTO sink SELECT * FROM f(d => INTERVAL '1' SECOND)")
400+
.build();
401+
402+
public static final TableTestProgram PROCESS_INTERVAL_YEAR_ARGS =
403+
TableTestProgram.of("process-interval-year-args", "interval argument")
404+
.setupTemporarySystemFunction("f", IntervalYearArgFunction.class)
405+
.setupSql(BASIC_VALUES)
406+
.setupTableSink(
407+
SinkTestStep.newBuilder("sink")
408+
.addSchema(BASE_SINK_SCHEMA)
409+
.consumedValues("+I[{P1Y}]")
410+
.build())
411+
.runSql("INSERT INTO sink SELECT * FROM f(p => INTERVAL '1' YEAR)")
412+
.build();
413+
388414
public static final TableTestProgram PROCESS_ROW_SEMANTIC_TABLE_PASS_THROUGH =
389415
TableTestProgram.of("process-row-pass-through", "pass columns through enabled")
390416
.setupTemporarySystemFunction("f", RowSemanticTablePassThroughFunction.class)

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
import org.apache.flink.types.Row;
4343
import org.apache.flink.types.RowKind;
4444

45+
import java.time.Duration;
4546
import java.time.Instant;
4647
import java.time.LocalDateTime;
48+
import java.time.Period;
4749
import java.util.Arrays;
4850
import java.util.EnumSet;
4951
import java.util.HashMap;
@@ -336,6 +338,20 @@ public void eval() {
336338
}
337339
}
338340

341+
/** Testing function. */
342+
public static class IntervalDayArgFunction extends AppendProcessTableFunctionBase {
343+
public void eval(Duration d) {
344+
collectObjects(d);
345+
}
346+
}
347+
348+
/** Testing function. */
349+
public static class IntervalYearArgFunction extends AppendProcessTableFunctionBase {
350+
public void eval(Period p) {
351+
collectObjects(p);
352+
}
353+
}
354+
339355
/** Testing function. */
340356
public static class RowSemanticTablePassThroughFunction extends AppendProcessTableFunctionBase {
341357
public void eval(

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.AppendProcessTableFunctionBase;
2929
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.DescriptorFunction;
3030
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.EmptyArgFunction;
31+
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalDayArgFunction;
32+
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.IntervalYearArgFunction;
3133
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.InvalidUpdatingSemanticsFunction;
3234
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.MultiInputFunction;
3335
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.NoSystemArgsScalarFunction;
@@ -149,6 +151,18 @@ void testEmptyArgs() {
149151
util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
150152
}
151153

154+
@Test
155+
void testIntervalDayArgs() {
156+
util.addTemporarySystemFunction("f", IntervalDayArgFunction.class);
157+
util.verifyRelPlan("SELECT * FROM f(d => INTERVAL '1' SECOND)");
158+
}
159+
160+
@Test
161+
void testIntervalYearArgs() {
162+
util.addTemporarySystemFunction("f", IntervalYearArgFunction.class);
163+
util.verifyRelPlan("SELECT * FROM f(p => INTERVAL '1' YEAR)");
164+
}
165+
152166
@Test
153167
void testSetSemanticTablePassThroughColumns() {
154168
util.addTemporarySystemFunction("f", SetSemanticTablePassThroughFunction.class);

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,40 @@ LogicalProject(out=[$0])
6868
<![CDATA[
6969
ProcessTableFunction(invocation=[f(DEFAULT(), _UTF-16LE'my-ptf')], uid=[my-ptf], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
7070
+- Values(tuples=[[{ }]])
71+
]]>
72+
</Resource>
73+
</TestCase>
74+
<TestCase name="testIntervalDayArgs">
75+
<Resource name="sql">
76+
<![CDATA[SELECT * FROM f(d => INTERVAL '1' SECOND)]]>
77+
</Resource>
78+
<Resource name="ast">
79+
<![CDATA[
80+
LogicalProject(out=[$0])
81+
+- LogicalTableFunctionScan(invocation=[f(1000:INTERVAL SECOND, DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) out)])
82+
]]>
83+
</Resource>
84+
<Resource name="optimized rel plan">
85+
<![CDATA[
86+
ProcessTableFunction(invocation=[f(1000:INTERVAL SECOND, DEFAULT(), DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
87+
+- Values(tuples=[[{ }]])
88+
]]>
89+
</Resource>
90+
</TestCase>
91+
<TestCase name="testIntervalYearArgs">
92+
<Resource name="sql">
93+
<![CDATA[SELECT * FROM f(p => INTERVAL '1' YEAR)]]>
94+
</Resource>
95+
<Resource name="ast">
96+
<![CDATA[
97+
LogicalProject(out=[$0])
98+
+- LogicalTableFunctionScan(invocation=[f(12:INTERVAL YEAR, DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) out)])
99+
]]>
100+
</Resource>
101+
<Resource name="optimized rel plan">
102+
<![CDATA[
103+
ProcessTableFunction(invocation=[f(12:INTERVAL YEAR, DEFAULT(), DEFAULT())], uid=[null], select=[out], rowType=[RecordType(VARCHAR(2147483647) out)])
104+
+- Values(tuples=[[{ }]])
71105
]]>
72106
</Resource>
73107
</TestCase>

0 commit comments

Comments
 (0)