Skip to content

Commit 224a075

Browse files
authored
[FLINK-37905][runtime] Fix transform failure with non-ascii string literals (#4038)
* [FLINK-37905] Fix transform failure with non-ascii string literals * address comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --------- Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
1 parent 5bf2564 commit 224a075

File tree

5 files changed

+301
-3
lines changed

5 files changed

+301
-3
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2725,6 +2725,109 @@ void testShadeOriginalColumnsWithDifferentType() throws Exception {
27252725
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2.5, ], after=[2.5, x], op=UPDATE, meta=({op_ts=5})}");
27262726
}
27272727

2728+
private static final String[] UNICODE_STRINGS = {
2729+
"ascii test!?",
2730+
"大五",
2731+
"测试数据",
2732+
"ひびぴ",
2733+
"죠주쥬",
2734+
"ÀÆÉ",
2735+
"ÓÔŐÖ",
2736+
"αβγδε",
2737+
"בבקשה",
2738+
"твой",
2739+
"ภาษาไทย",
2740+
"piedzimst brīvi"
2741+
};
2742+
2743+
@ParameterizedTest
2744+
@EnumSource
2745+
void testTransformProjectionWithUnicodeCharacters(ValuesDataSink.SinkApi sinkApi)
2746+
throws Exception {
2747+
for (String unicodeString : UNICODE_STRINGS) {
2748+
List<String> expected =
2749+
Stream.of(
2750+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`prefix` STRING,`id` INT NOT NULL,`name` STRING,`age` INT,`suffix` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
2751+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[{UNICODE_STRING} -> 1, 1, Alice, 18, 1 <- {UNICODE_STRING}], op=INSERT, meta=()}",
2752+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[{UNICODE_STRING} -> 2, 2, Bob, 20, 2 <- {UNICODE_STRING}], op=INSERT, meta=()}",
2753+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[{UNICODE_STRING} -> 2, 2, Bob, 20, 2 <- {UNICODE_STRING}], after=[{UNICODE_STRING} -> 2, 2, Bob, 30, 2 <- {UNICODE_STRING}], op=UPDATE, meta=()}",
2754+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`prefix` STRING,`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`suffix` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
2755+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[{UNICODE_STRING} -> 3, 3, Carol, 15, student, 3 <- {UNICODE_STRING}], op=INSERT, meta=()}",
2756+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[{UNICODE_STRING} -> 4, 4, Derrida, 25, student, 4 <- {UNICODE_STRING}], op=INSERT, meta=()}",
2757+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[{UNICODE_STRING} -> 4, 4, Derrida, 25, student, 4 <- {UNICODE_STRING}], after=[], op=DELETE, meta=()}")
2758+
.map(tpl -> tpl.replace("{UNICODE_STRING}", unicodeString))
2759+
.collect(Collectors.toList());
2760+
2761+
runGenericTransformTest(
2762+
sinkApi,
2763+
Collections.singletonList(
2764+
new TransformDef(
2765+
"\\.*.\\.*.\\.*",
2766+
String.format(
2767+
"'%s' || ' -> ' || id AS prefix, *, id || ' <- ' || '%s' AS suffix",
2768+
unicodeString, unicodeString),
2769+
null,
2770+
null,
2771+
"id",
2772+
null,
2773+
null,
2774+
null)),
2775+
expected);
2776+
outCaptor.reset();
2777+
}
2778+
}
2779+
2780+
@ParameterizedTest
2781+
@EnumSource
2782+
void testTransformFilterWithUnicodeCharacters(ValuesDataSink.SinkApi sinkApi) throws Exception {
2783+
for (String unicodeString : UNICODE_STRINGS) {
2784+
List<String> expected =
2785+
Stream.of(
2786+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
2787+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, {UNICODE_STRING}], op=INSERT, meta=()}",
2788+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, {UNICODE_STRING}], op=INSERT, meta=()}",
2789+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, {UNICODE_STRING}], after=[2, Bob, 30, {UNICODE_STRING}], op=UPDATE, meta=()}",
2790+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
2791+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, {UNICODE_STRING}], op=INSERT, meta=()}",
2792+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, {UNICODE_STRING}], op=INSERT, meta=()}",
2793+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, {UNICODE_STRING}], after=[], op=DELETE, meta=()}")
2794+
.map(tpl -> tpl.replace("{UNICODE_STRING}", unicodeString))
2795+
.collect(Collectors.toList());
2796+
2797+
runGenericTransformTest(
2798+
sinkApi,
2799+
Collections.singletonList(
2800+
new TransformDef(
2801+
"\\.*.\\.*.\\.*",
2802+
String.format("*, '%s' AS extras", unicodeString),
2803+
String.format("extras = '%s'", unicodeString),
2804+
null,
2805+
"id",
2806+
null,
2807+
null,
2808+
null)),
2809+
expected);
2810+
outCaptor.reset();
2811+
2812+
runGenericTransformTest(
2813+
sinkApi,
2814+
Collections.singletonList(
2815+
new TransformDef(
2816+
"\\.*.\\.*.\\.*",
2817+
String.format("*, '%s' AS extras", unicodeString),
2818+
String.format("extras <> '%s'", unicodeString),
2819+
null,
2820+
"id",
2821+
null,
2822+
null,
2823+
null)),
2824+
Arrays.asList(
2825+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
2826+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}"));
2827+
outCaptor.reset();
2828+
}
2829+
}
2830+
27282831
private List<Event> generateFloorCeilAndRoundEvents(TableId tableId) {
27292832
List<Event> events = new ArrayList<>();
27302833
Schema schema =

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,138 @@ void testTransformWildcardSuffixWithSchemaEvolution() throws Exception {
10911091
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3010 <- id, Beginning, 3010, 10, 10, 97, Lemon], op=INSERT, meta=()}");
10921092
}
10931093

1094+
private static final String[] UNICODE_STRINGS = {
1095+
"ascii test!?",
1096+
"大五",
1097+
"测试数据",
1098+
"ひびぴ",
1099+
"죠주쥬",
1100+
"ÀÆÉ",
1101+
"ÓÔŐÖ",
1102+
"αβγδε",
1103+
"בבקשה",
1104+
"твой",
1105+
"ภาษาไทย",
1106+
"piedzimst brīvi"
1107+
};
1108+
1109+
@Test
1110+
void testTransformWithUnicodeLiterals() throws Exception {
1111+
StringBuilder projectionExpression = new StringBuilder("\\*,");
1112+
for (int i = 0; i < UNICODE_STRINGS.length; i++) {
1113+
projectionExpression
1114+
.append('\'')
1115+
.append(UNICODE_STRINGS[i])
1116+
.append('\'')
1117+
.append(" AS col_")
1118+
.append(i)
1119+
.append(",");
1120+
}
1121+
projectionExpression.deleteCharAt(projectionExpression.length() - 1);
1122+
1123+
String pipelineJob =
1124+
String.format(
1125+
"source:\n"
1126+
+ " type: mysql\n"
1127+
+ " hostname: %s\n"
1128+
+ " port: 3306\n"
1129+
+ " username: %s\n"
1130+
+ " password: %s\n"
1131+
+ " tables: %s.TABLEALPHA\n"
1132+
+ " server-id: 5400-5404\n"
1133+
+ " server-time-zone: UTC\n"
1134+
+ "sink:\n"
1135+
+ " type: values\n"
1136+
+ "transform:\n"
1137+
+ " - source-table: %s.\\.*\n"
1138+
+ " projection: %s\n"
1139+
+ " filter: ID > 1008\n"
1140+
+ "pipeline:\n"
1141+
+ " parallelism: %d\n"
1142+
+ " schema.change.behavior: evolve",
1143+
INTER_CONTAINER_MYSQL_ALIAS,
1144+
MYSQL_TEST_USER,
1145+
MYSQL_TEST_PASSWORD,
1146+
transformTestDatabase.getDatabaseName(),
1147+
transformTestDatabase.getDatabaseName(),
1148+
projectionExpression,
1149+
parallelism);
1150+
submitPipelineJob(pipelineJob);
1151+
waitUntilJobRunning(Duration.ofSeconds(30));
1152+
LOG.info("Pipeline job is running");
1153+
1154+
validateResult(
1155+
dbNameFormatter,
1156+
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`col_0` STRING,`col_1` STRING,`col_2` STRING,`col_3` STRING,`col_4` STRING,`col_5` STRING,`col_6` STRING,`col_7` STRING,`col_8` STRING,`col_9` STRING,`col_10` STRING,`col_11` STRING}, primaryKeys=ID, options=()}",
1157+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
1158+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
1159+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}");
1160+
1161+
LOG.info("Begin incremental reading stage.");
1162+
// generate binlogs
1163+
String mysqlJdbcUrl =
1164+
String.format(
1165+
"jdbc:mysql://%s:%s/%s",
1166+
MYSQL.getHost(),
1167+
MYSQL.getDatabasePort(),
1168+
transformTestDatabase.getDatabaseName());
1169+
try (Connection conn =
1170+
DriverManager.getConnection(
1171+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
1172+
Statement stat = conn.createStatement()) {
1173+
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
1174+
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 16, 'IINA');");
1175+
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
1176+
} catch (SQLException e) {
1177+
LOG.error("Update table for CDC failed.", e);
1178+
throw e;
1179+
}
1180+
1181+
validateResult(
1182+
dbNameFormatter,
1183+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], after=[1009, 100, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=UPDATE, meta=()}",
1184+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}");
1185+
1186+
LOG.info("Start schema evolution.");
1187+
try (Connection conn =
1188+
DriverManager.getConnection(
1189+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
1190+
Statement stmt = conn.createStatement()) {
1191+
1192+
// triggers AddColumnEvent
1193+
stmt.execute("ALTER TABLE TABLEALPHA ADD COLUMN CODENAME TINYINT AFTER VERSION;");
1194+
stmt.execute("ALTER TABLE TABLEALPHA ADD COLUMN FIRST VARCHAR(17) FIRST;");
1195+
stmt.execute("INSERT INTO TABLEALPHA VALUES ('First', 3008, '8', 8, 80, 17, 'Jazz');");
1196+
1197+
// triggers AlterColumnTypeEvent and RenameColumnEvent
1198+
stmt.execute("ALTER TABLE TABLEALPHA CHANGE COLUMN CODENAME CODE_NAME DOUBLE;");
1199+
1200+
// triggers RenameColumnEvent
1201+
stmt.execute("ALTER TABLE TABLEALPHA RENAME COLUMN CODE_NAME TO CODE_NAME_EX;");
1202+
stmt.execute("INSERT INTO TABLEALPHA VALUES ('1st', 3009, '9', 9, 90, 18, 'Keka');");
1203+
1204+
// triggers DropColumnEvent
1205+
stmt.execute("ALTER TABLE TABLEALPHA DROP COLUMN CODE_NAME_EX");
1206+
stmt.execute(
1207+
"INSERT INTO TABLEALPHA VALUES ('Beginning', 3010, '10', 10, 97, 'Lemon');");
1208+
} catch (SQLException e) {
1209+
LOG.error("Update table for CDC failed.", e);
1210+
throw e;
1211+
}
1212+
1213+
validateResult(
1214+
dbNameFormatter,
1215+
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}",
1216+
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}",
1217+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[First, 3008, 8, 8, 80, 17, Jazz, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
1218+
"AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}",
1219+
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}",
1220+
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}",
1221+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1st, 3009, 9, 9.0, 90, 18, Keka, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
1222+
"DropColumnEvent{tableId=%s.TABLEALPHA, droppedColumnNames=[CODE_NAME_EX]}",
1223+
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[Beginning, 3010, 10, 10, 97, Lemon, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}");
1224+
}
1225+
10941226
private void validateEventsWithPattern(String... patterns) throws Exception {
10951227
for (String pattern : patterns) {
10961228
waitUntilSpecificEventWithPattern(

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.calcite.sql.SqlNumericLiteral;
3535
import org.apache.calcite.sql.fun.SqlCase;
3636
import org.apache.calcite.sql.type.SqlTypeName;
37+
import org.apache.calcite.util.NlsString;
3738
import org.codehaus.commons.compiler.CompileException;
3839
import org.codehaus.commons.compiler.Location;
3940
import org.codehaus.janino.ExpressionEvaluator;
@@ -158,10 +159,10 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
158159
if (sqlLiteral.getValue() == null) {
159160
return new Java.NullLiteral(Location.NOWHERE);
160161
}
161-
String value = sqlLiteral.getValue().toString();
162+
Object value = sqlLiteral.getValue();
162163
if (sqlLiteral instanceof SqlCharStringLiteral) {
163164
// Double quotation marks represent strings in Janino.
164-
value = "\"" + value.substring(1, value.length() - 1) + "\"";
165+
value = "\"" + sqlLiteral.getValueAs(NlsString.class).getValue() + "\"";
165166
} else if (sqlLiteral instanceof SqlNumericLiteral) {
166167
if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
167168
long longValue = sqlLiteral.longValue(true);
@@ -173,7 +174,7 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
173174
if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) {
174175
value = "\"" + value + "\"";
175176
}
176-
return new Java.AmbiguousName(Location.NOWHERE, new String[] {value});
177+
return new Java.AmbiguousName(Location.NOWHERE, new String[] {value.toString()});
177178
}
178179

179180
private static Java.Rvalue translateSqlBasicCall(
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
################################################################################
15+
16+
calcite.default.charset = utf8

0 commit comments

Comments
 (0)