diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md b/docs/content.zh/docs/connectors/flink-sources/overview.md
index bdc1a23286a..1c6661eb2ac 100644
--- a/docs/content.zh/docs/connectors/flink-sources/overview.md
+++ b/docs/content.zh/docs/connectors/flink-sources/overview.md
@@ -82,7 +82,7 @@ The following table shows the current features of the connector:
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
-| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ❌ | ❌ | ❌ | ❌ |
+| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}}) | ✅ | ❌ | ✅ | ❌ |
| [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}}) | ✅ | ❌ | ✅ | ❌ |
diff --git a/docs/content/docs/connectors/flink-sources/overview.md b/docs/content/docs/connectors/flink-sources/overview.md
index c275d54360c..4369e9c6391 100644
--- a/docs/content/docs/connectors/flink-sources/overview.md
+++ b/docs/content/docs/connectors/flink-sources/overview.md
@@ -75,14 +75,14 @@ The following table shows the version mapping between Flink® CDC Con
The following table shows the current features of the connector:
-| Connector | No-lock Read | Parallel Read | Exactly-once Read | Incremental Snapshot Read |
-|---------------------------------------------------------------------------------------|--------------|---------------|-------------------|---------------------------|
+| Connector | No-lock Read | Parallel Read | Exactly-once Read | Incremental Snapshot Read |
+|----------------------------------------------------------------------------|--------------|---------------|-------------------|---------------------------|
| [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
-| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ❌ | ❌ | ❌ | ❌ |
+| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}}) | ✅ | ❌ | ✅ | ❌ |
| [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}}) | ✅ | ✅ | ✅ | ✅ |
| [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}}) | ✅ | ❌ | ✅ | ❌ |
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
index 96366a9af91..2453d10df02 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
@@ -169,6 +169,13 @@ limitations under the License.
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
org.testcontainers
mysql
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
new file mode 100644
index 00000000000..2e43a315c1d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+/** Test supporting different column charsets for OceanBase. */
+public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase {
+
+ private static final String DDL_FILE = "charset_test";
+ private static final String DATABASE_NAME = "cdc_c_" + getRandomSuffix();
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ private final StreamTableEnvironment tEnv =
+ StreamTableEnvironment.create(
+ env, EnvironmentSettings.newInstance().inStreamingMode().build());
+
+ @BeforeAll
+ public static void beforeClass() throws InterruptedException {
+ initializeOceanBaseTables(
+ DDL_FILE,
+ DATABASE_NAME,
+ s -> // see:
+ // https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002017544
+ !StringUtils.isNullOrWhitespaceOnly(s)
+ && (s.contains("utf8_test")
+ || s.contains("latin1_test")
+ || s.contains("gbk_test")
+ || s.contains("big5_test")
+ || s.contains("ascii_test")
+ || s.contains("sjis_test")));
+ }
+
+ @AfterAll
+ public static void after() {
+ dropDatabase(DATABASE_NAME);
+ }
+
+ @BeforeEach
+ public void before() {
+ TestValuesTableFactory.clearAllData();
+ env.setParallelism(4);
+ env.enableCheckpointing(200);
+ }
+
+ public static Stream parameters() {
+ return Stream.of(
+ Arguments.of(
+ "utf8_test",
+ new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
+ new String[] {
+ "-D[1, 测试数据]",
+ "-D[2, Craig Marshall]",
+ "-D[3, 另一个测试数据]",
+ "+I[11, 测试数据]",
+ "+I[12, Craig Marshall]",
+ "+I[13, 另一个测试数据]"
+ }),
+ Arguments.of(
+ "ascii_test",
+ new String[] {
+ "+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"
+ },
+ new String[] {
+ "-D[1, ascii test!?]",
+ "-D[2, Craig Marshall]",
+ "-D[3, {test}]",
+ "+I[11, ascii test!?]",
+ "+I[12, Craig Marshall]",
+ "+I[13, {test}]"
+ }),
+ Arguments.of(
+ "gbk_test",
+ new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
+ new String[] {
+ "-D[1, 测试数据]",
+ "-D[2, Craig Marshall]",
+ "-D[3, 另一个测试数据]",
+ "+I[11, 测试数据]",
+ "+I[12, Craig Marshall]",
+ "+I[13, 另一个测试数据]"
+ }),
+ Arguments.of(
+ "latin1_test",
+ new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"},
+ new String[] {
+ "-D[1, ÀÆÉ]",
+ "-D[2, Craig Marshall]",
+ "-D[3, Üæû]",
+ "+I[11, ÀÆÉ]",
+ "+I[12, Craig Marshall]",
+ "+I[13, Üæû]"
+ }),
+ Arguments.of(
+ "big5_test",
+ new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"},
+ new String[] {
+ "-D[1, 大五]",
+ "-D[2, Craig Marshall]",
+ "-D[3, 丹店]",
+ "+I[11, 大五]",
+ "+I[12, Craig Marshall]",
+ "+I[13, 丹店]"
+ }),
+ Arguments.of(
+ "sjis_test",
+ new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"},
+ new String[] {
+ "-D[1, ひびぴ]",
+ "-D[2, Craig Marshall]",
+ "-D[3, フブプ]",
+ "+I[11, ひびぴ]",
+ "+I[12, Craig Marshall]",
+ "+I[13, フブプ]"
+ }));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testCharset(String testName, String[] snapshotExpected, String[] binlogExpected)
+ throws Exception {
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE %s (\n"
+ + " table_id BIGINT,\n"
+ + " table_name STRING,\n"
+ + " primary key(table_id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' = '%s',"
+ + " 'server-id' = '%s',"
+ + " 'server-time-zone' = 'Asia/Shanghai',"
+ + " 'jdbc.properties.connectTimeout' = '6000000000',"
+ + " 'jdbc.properties.socketTimeout' = '6000000000',"
+ + " 'jdbc.properties.autoReconnect' = 'true',"
+ + " 'jdbc.properties.failOverReadOnly' = 'false',"
+ + " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ + ")",
+ testName,
+ getHost(),
+ getPort(),
+ USER_NAME,
+ PASSWORD,
+ DATABASE_NAME,
+ testName,
+ true,
+ getServerId(),
+ 4);
+ tEnv.executeSql(sourceDDL);
+ // async submit job
+ TableResult result =
+ tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", testName));
+
+ // test snapshot phase
+ CloseableIterator iterator = result.collect();
+ waitForSnapshotStarted(iterator);
+ assertEqualsInAnyOrder(
+ Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length));
+
+ // test binlog phase
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "/*TDDL:FORBID_EXECUTE_DML_ALL=FALSE*/UPDATE %s.%s SET table_id = table_id + 10;",
+ DATABASE_NAME, testName));
+ }
+ assertEqualsInAnyOrder(
+ Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length));
+ result.getJobClient().get().cancel().get();
+
+ // Sleep to avoid the issue: The last packet successfully received from the server was 35
+ // milliseconds ago.
+ Thread.sleep(1_000);
+ }
+
+ private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception {
+ while (!iterator.hasNext()) {
+ Thread.sleep(100);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
new file mode 100644
index 00000000000..e6e8251191e
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.StringUtils;
+
+import io.debezium.jdbc.JdbcConnection;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
+/** failover IT tests for oceanbase. */
+@Timeout(value = 180, unit = TimeUnit.SECONDS)
+@Disabled(
+ "The current version of the Oceanbase binlog service causes failover test cases to fail. Disable the test and wait for the binlog version to be updated.")
+public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
+
+ private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
+ private static final String DDL_FILE = "oceanbase_ddl_test";
+ protected static final int DEFAULT_PARALLELISM = 4;
+ private String testDatabase = "customer_" + getRandomSuffix();
+
+ private final List firstPartBinlogEvents =
+ Arrays.asList(
+ "-U[103, user_3, Shanghai, 123567891234]",
+ "+U[103, user_3, Hangzhou, 123567891234]",
+ "-D[102, user_2, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "-U[103, user_3, Hangzhou, 123567891234]",
+ "+U[103, user_3, Shanghai, 123567891234]");
+
+ private final List secondPartBinlogEvents =
+ Arrays.asList(
+ "-U[1010, user_11, Shanghai, 123567891234]",
+ "+I[2001, user_22, Shanghai, 123567891234]",
+ "+I[2002, user_23, Shanghai, 123567891234]",
+ "+I[2003, user_24, Shanghai, 123567891234]",
+ "+U[1010, user_11, Hangzhou, 123567891234]");
+
+ public static Stream parameters() {
+ return Stream.of(
+ Arguments.of("customers", null, "false"),
+ Arguments.of("customers", "id", "true"),
+ Arguments.of("customers_no_pk", "id", "true"));
+ }
+
+ @RegisterExtension
+ public final ExternalResourceProxy miniClusterResource =
+ new ExternalResourceProxy<>(
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build()));
+
+ @BeforeEach
+ public void setup() throws InterruptedException {
+ testDatabase = "customer_" + getRandomSuffix();
+ initializeOceanBaseTables(
+ DDL_FILE,
+ testDatabase,
+ s -> !StringUtils.isNullOrWhitespaceOnly(s) && (s.contains("customers")));
+ }
+
+ @AfterEach
+ public void clean() {
+ dropDatabase(testDatabase);
+ }
+
+ // Failover tests
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testTaskManagerFailoverInSnapshotPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ FailoverType.TM,
+ FailoverPhase.SNAPSHOT,
+ new String[] {tableName, "customers_1"},
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testTaskManagerFailoverInBinlogPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ FailoverType.TM,
+ FailoverPhase.BINLOG,
+ new String[] {tableName, "customers_1"},
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testTaskManagerFailoverFromLatestOffset(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ DEFAULT_PARALLELISM,
+ "latest-offset",
+ FailoverType.TM,
+ FailoverPhase.BINLOG,
+ new String[] {tableName, "customers_1"},
+ RestartStrategies.fixedDelayRestart(1, 0),
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testJobManagerFailoverInSnapshotPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ FailoverType.JM,
+ FailoverPhase.SNAPSHOT,
+ new String[] {tableName, "customers_1"},
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testJobManagerFailoverInBinlogPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ FailoverType.JM,
+ FailoverPhase.BINLOG,
+ new String[] {tableName, "customers_1"},
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testJobManagerFailoverFromLatestOffset(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ DEFAULT_PARALLELISM,
+ "latest-offset",
+ FailoverType.JM,
+ FailoverPhase.BINLOG,
+ new String[] {tableName, "customers_1"},
+ RestartStrategies.fixedDelayRestart(1, 0),
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testTaskManagerFailoverSingleParallelism(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ 1,
+ FailoverType.TM,
+ FailoverPhase.SNAPSHOT,
+ new String[] {tableName},
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void testJobManagerFailoverSingleParallelism(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
+ testMySqlParallelSource(
+ 1,
+ FailoverType.JM,
+ FailoverPhase.SNAPSHOT,
+ new String[] {tableName},
+ tableName,
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
+ }
+
+ private void testMySqlParallelSource(
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables,
+ String tableName,
+ String chunkColumnName,
+ Map otherOptions)
+ throws Exception {
+ testMySqlParallelSource(
+ DEFAULT_PARALLELISM,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ tableName,
+ chunkColumnName,
+ otherOptions);
+ }
+
+ private void testMySqlParallelSource(
+ int parallelism,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables,
+ String tableName,
+ String chunkColumnName,
+ Map otherOptions)
+ throws Exception {
+ testMySqlParallelSource(
+ parallelism,
+ DEFAULT_SCAN_STARTUP_MODE,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ RestartStrategies.fixedDelayRestart(1, 0),
+ tableName,
+ chunkColumnName,
+ otherOptions);
+ }
+
+ private void testMySqlParallelSource(
+ int parallelism,
+ String scanStartupMode,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables,
+ RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
+ String tableName,
+ String chunkColumnName,
+ Map otherOptions)
+ throws Exception {
+ testMySqlParallelSource(
+ parallelism,
+ scanStartupMode,
+ failoverType,
+ failoverPhase,
+ captureCustomerTables,
+ restartStrategyConfiguration,
+ false,
+ tableName,
+ chunkColumnName,
+ otherOptions);
+ }
+
+ private void testMySqlParallelSource(
+ int parallelism,
+ String scanStartupMode,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables,
+ RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
+ boolean skipSnapshotBackfill,
+ String tableName,
+ String chunkColumnName,
+ Map otherOptions)
+ throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(restartStrategyConfiguration);
+ String sourceDDL =
+ format(
+ "CREATE TABLE customers ("
+ + " id BIGINT NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING"
+ + ("customers_no_pk".equals(tableName)
+ ? ""
+ : ", primary key (id) not enforced")
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'scan.incremental.snapshot.enabled' = 'true',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.startup.mode' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '100',"
+ + " 'scan.incremental.snapshot.backfill.skip' = '%s',"
+ + " 'server-time-zone' = 'Asia/Shanghai',"
+ + " 'server-id' = '%s'"
+ + " %s"
+ + " %s"
+ + ")",
+ getHost(),
+ getPort(),
+ getUserName(),
+ getPassword(),
+ testDatabase,
+ getTableNameRegex(captureCustomerTables),
+ scanStartupMode,
+ skipSnapshotBackfill,
+ getServerId(),
+ chunkColumnName == null
+ ? ""
+ : String.format(
+ ", 'scan.incremental.snapshot.chunk.key-column' = '%s'",
+ chunkColumnName),
+ otherOptions.isEmpty()
+ ? ""
+ : ","
+ + otherOptions.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+ "'%s'='%s'",
+ e.getKey(), e.getValue()))
+ .collect(Collectors.joining(",")));
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from customers");
+
+ // first step: check the snapshot data
+ if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
+ checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
+ }
+
+ // second step: check the binlog data
+ checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables);
+
+ sleepMs(3000);
+ tableResult.getJobClient().get().cancel().get();
+ }
+
+ private void checkSnapshotData(
+ TableResult tableResult,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables)
+ throws Exception {
+ String[] snapshotForSingleTable =
+ new String[] {
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "+I[103, user_3, Shanghai, 123567891234]",
+ "+I[109, user_4, Shanghai, 123567891234]",
+ "+I[110, user_5, Shanghai, 123567891234]",
+ "+I[111, user_6, Shanghai, 123567891234]",
+ "+I[118, user_7, Shanghai, 123567891234]",
+ "+I[121, user_8, Shanghai, 123567891234]",
+ "+I[123, user_9, Shanghai, 123567891234]",
+ "+I[1009, user_10, Shanghai, 123567891234]",
+ "+I[1010, user_11, Shanghai, 123567891234]",
+ "+I[1011, user_12, Shanghai, 123567891234]",
+ "+I[1012, user_13, Shanghai, 123567891234]",
+ "+I[1013, user_14, Shanghai, 123567891234]",
+ "+I[1014, user_15, Shanghai, 123567891234]",
+ "+I[1015, user_16, Shanghai, 123567891234]",
+ "+I[1016, user_17, Shanghai, 123567891234]",
+ "+I[1017, user_18, Shanghai, 123567891234]",
+ "+I[1018, user_19, Shanghai, 123567891234]",
+ "+I[1019, user_20, Shanghai, 123567891234]",
+ "+I[2000, user_21, Shanghai, 123567891234]"
+ };
+
+ List expectedSnapshotData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerTables.length; i++) {
+ expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
+ }
+
+ CloseableIterator iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+
+ // trigger failover after some snapshot splits read finished
+ if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
+ triggerFailover(
+ failoverType,
+ jobId,
+ miniClusterResource.get().getMiniCluster(),
+ () -> sleepMs(100));
+ }
+
+ assertEqualsInAnyOrder(
+ expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
+ }
+
+ private void checkBinlogData(
+ TableResult tableResult,
+ FailoverType failoverType,
+ FailoverPhase failoverPhase,
+ String[] captureCustomerTables)
+ throws Exception {
+ waitUntilJobRunning(tableResult);
+ CloseableIterator iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+
+ for (String tableId : captureCustomerTables) {
+ makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
+ }
+
+ // wait for the binlog reading
+ Thread.sleep(3_000L);
+
+ if (failoverPhase == FailoverPhase.BINLOG) {
+ triggerFailover(
+ failoverType,
+ jobId,
+ miniClusterResource.get().getMiniCluster(),
+ () -> sleepMs(200));
+ waitUntilJobRunning(tableResult);
+ }
+ for (String tableId : captureCustomerTables) {
+ makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
+ }
+
+ List expectedBinlogData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerTables.length; i++) {
+ expectedBinlogData.addAll(firstPartBinlogEvents);
+ expectedBinlogData.addAll(secondPartBinlogEvents);
+ }
+ sleepMs(3_000);
+ assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
+ Assertions.assertThat(hasNextData(iterator)).isFalse();
+ }
+
+ private void waitUntilJobRunning(TableResult tableResult)
+ throws InterruptedException, ExecutionException {
+ do {
+ Thread.sleep(5000L);
+ } while (tableResult.getJobClient().get().getJobStatus().get() != RUNNING);
+ }
+
+ private boolean hasNextData(final CloseableIterator> iterator)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ FutureTask future = new FutureTask(iterator::hasNext);
+ executor.execute(future);
+ return future.get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ return false;
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ /**
+ * Make some changes on the specified customer table. Changelog in string could be accessed by
+ * {@link #firstPartBinlogEvents}.
+ */
+ private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId)
+ throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+
+ // make binlog events for the first split
+ connection.execute(
+ "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
+ "DELETE FROM " + tableId + " where id = 102",
+ "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
+ "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
+ connection.commit();
+ } finally {
+ connection.close();
+ }
+ }
+
+ /**
+ * Make some other changes on the specified customer table. Changelog in string could be
+ * accessed by {@link #secondPartBinlogEvents}.
+ */
+ private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId)
+ throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+
+ // make binlog events for split-1
+ connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010");
+ connection.commit();
+
+ // make binlog events for the last split
+ connection.execute(
+ "INSERT INTO "
+ + tableId
+ + " VALUES(2001, 'user_22','Shanghai','123567891234'),"
+ + " (2002, 'user_23','Shanghai','123567891234'),"
+ + "(2003, 'user_24','Shanghai','123567891234')");
+ connection.commit();
+ } finally {
+ connection.close();
+ }
+ }
+
+ private void sleepMs(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ignored) {
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
new file mode 100644
index 00000000000..51b4a6d69ec
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.lang.String.format;
+
+/** OceanBase CDC source connector integration test. */
+public class OceanBaseSourceITCase extends OceanBaseSourceTestBase {
+ private static final String DDL_FILE = "oceanbase_ddl_test";
+ private static final String DATABASE_NAME = "cdc_s_" + getRandomSuffix();
+
+ @BeforeAll
+ public static void beforeClass() throws InterruptedException {
+ initializeOceanBaseTables(DDL_FILE, DATABASE_NAME, null);
+ }
+
+ @AfterAll
+ public static void afterClass() {
+ dropDatabase(DATABASE_NAME);
+ }
+
+ @Test
+ public void testSingleKey() throws Exception {
+ int parallelism = 1;
+ String[] captureCustomerTables = new String[] {"orders"};
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ String sourceDDL =
+ format(
+ "CREATE TABLE orders_source ("
+ + " id BIGINT NOT NULL,"
+ + " seller_id STRING,"
+ + " order_id STRING,"
+ + " buyer_id STRING,"
+ + " create_time TIMESTAMP,"
+ + " primary key (id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'scan.incremental.snapshot.enabled' = 'true',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '100',"
+ + " 'server-time-zone' = 'Asia/Shanghai',"
+ + " 'server-id' = '%s'"
+ + ")",
+ getHost(),
+ getPort(),
+ USER_NAME,
+ PASSWORD,
+ DATABASE_NAME,
+ getTableNameRegex(captureCustomerTables),
+ getServerId());
+
+ // first step: check the snapshot data
+ String[] snapshotForSingleTable =
+ new String[] {
+ "+I[1, 1001, 1, 102, 2022-01-16T00:00]",
+ "+I[2, 1002, 2, 105, 2022-01-16T00:00]",
+ "+I[3, 1004, 3, 109, 2022-01-16T00:00]",
+ "+I[4, 1002, 2, 106, 2022-01-16T00:00]",
+ "+I[5, 1003, 1, 107, 2022-01-16T00:00]",
+ };
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from orders_source");
+ CloseableIterator iterator = tableResult.collect();
+ List expectedSnapshotData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerTables.length; i++) {
+ expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
+ }
+
+ List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size());
+ assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
+
+ // second step: check the sink data
+ tEnv.executeSql(
+ "CREATE TABLE sink ("
+ + " id BIGINT NOT NULL,"
+ + " seller_id STRING,"
+ + " order_id STRING,"
+ + " buyer_id STRING,"
+ + " create_time TIMESTAMP,"
+ + " primary key (id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+ tableResult = tEnv.executeSql("insert into sink select * from orders_source");
+
+ waitForSinkSize("sink", realSnapshotData.size());
+ assertEqualsInAnyOrder(
+ expectedSnapshotData, TestValuesTableFactory.getRawResultsAsStrings("sink"));
+
+ // third step: check dml events
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("use " + DATABASE_NAME);
+ statement.execute("INSERT INTO orders VALUES (6, 1006,1006, 1006,'2022-01-17');");
+ statement.execute("INSERT INTO orders VALUES (7,1007, 1007,1007, '2022-01-17');");
+ statement.execute("UPDATE orders SET seller_id= 9999, order_id=9999 WHERE id=6;");
+ statement.execute("UPDATE orders SET seller_id= 9999, order_id=9999 WHERE id=7;");
+ statement.execute("DELETE FROM orders WHERE id=7;");
+ }
+
+ String[] expectedBinlog =
+ new String[] {
+ "+I[6, 1006, 1006, 1006, 2022-01-17T00:00]",
+ "+I[7, 1007, 1007, 1007, 2022-01-17T00:00]",
+ "-U[6, 1006, 1006, 1006, 2022-01-17T00:00]",
+ "+U[6, 9999, 9999, 1006, 2022-01-17T00:00]",
+ "-U[7, 1007, 1007, 1007, 2022-01-17T00:00]",
+ "+U[7, 9999, 9999, 1007, 2022-01-17T00:00]",
+ "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]"
+ };
+ List expectedBinlogData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerTables.length; i++) {
+ expectedBinlogData.addAll(Arrays.asList(expectedBinlog));
+ }
+ List realBinlog = fetchRows(iterator, expectedBinlog.length);
+ assertEqualsInOrder(expectedBinlogData, realBinlog);
+ Thread.sleep(3_000);
+ tableResult.getJobClient().get().cancel().get();
+ }
+
+ @Test
+ public void testFullTypesDdl() throws Exception {
+ int parallelism = 1;
+ String[] captureCustomerTables = new String[] {"oceanbase_full_types"};
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ String sourceDDL =
+ format(
+ "CREATE TABLE oceanbase_full_types (\n"
+ + " `id` INT NOT NULL,\n"
+ + " tiny_c TINYINT,\n"
+ + " tiny_un_c SMALLINT ,\n"
+ + " small_c SMALLINT,\n"
+ + " small_un_c INT,\n"
+ + " medium_c INT,\n"
+ + " medium_un_c INT,\n"
+ + " int_c INT ,\n"
+ + " int_un_c BIGINT,\n"
+ + " int11_c BIGINT,\n"
+ + " big_c BIGINT,\n"
+ + " big_un_c DECIMAL(20, 0),\n"
+ + " varchar_c VARCHAR(255),\n"
+ + " char_c CHAR(3),\n"
+ + " real_c FLOAT,\n"
+ + " float_c FLOAT,\n"
+ + " double_c DOUBLE,\n"
+ + " decimal_c DECIMAL(8, 4),\n"
+ + " numeric_c DECIMAL(6, 0),\n"
+ + " big_decimal_c STRING,\n"
+ + " bit1_c BOOLEAN,\n"
+ + " tiny1_c BOOLEAN,\n"
+ + " boolean_c BOOLEAN,\n"
+ + " date_c DATE,\n"
+ + " time_c TIME(0),\n"
+ + " datetime3_c TIMESTAMP(3),\n"
+ + " datetime6_c TIMESTAMP(6),\n"
+ + " timestamp_c TIMESTAMP(0),\n"
+ + " file_uuid BYTES,\n"
+ + " bit_c BINARY(8),\n"
+ + " text_c STRING,\n"
+ + " tiny_blob_c BYTES,\n"
+ + " blob_c BYTES,\n"
+ + " medium_blob_c BYTES,\n"
+ + " long_blob_c BYTES,\n"
+ + " year_c INT,\n"
+ + " enum_c STRING,\n"
+ + " set_c ARRAY,\n"
+ + " json_c STRING,\n"
+ + " point_c STRING,\n"
+ + " geometry_c STRING,\n"
+ + " linestring_c STRING,\n"
+ + " polygon_c STRING,\n"
+ + " multipoint_c STRING,\n"
+ + " multiline_c STRING,\n"
+ + " multipolygon_c STRING,\n"
+ + " geometrycollection_c STRING,\n"
+ + " primary key (`id`) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'scan.incremental.snapshot.enabled' = 'true',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '100',"
+ + " 'server-time-zone' = 'Asia/Shanghai',"
+ + " 'server-id' = '%s'"
+ + ")",
+ getHost(),
+ getPort(),
+ USER_NAME,
+ PASSWORD,
+ DATABASE_NAME,
+ getTableNameRegex(captureCustomerTables),
+ getServerId());
+ tEnv.executeSql(sourceDDL);
+
+ TableResult tableResult = tEnv.executeSql("select * from oceanbase_full_types");
+ CloseableIterator iterator = tableResult.collect();
+ List realSnapshotData = fetchRows(iterator, 1);
+ String[] expectedSnapshotData =
+ new String[] {
+ "+I[1, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, Hello World, abc, 123.102, 123.102, 404.4443, 123.4567, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, [101, 26, -19, 8, 57, 15, 72, -109, -78, -15, 54, -110, 62, 123, 116, 0], [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\": \"value1\", \"key2\": \"value2\", \"num1\": 16708304.0, \"num2\": 16708305}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}]"
+ };
+ assertEqualsInAnyOrder(Arrays.asList(expectedSnapshotData), realSnapshotData);
+ Thread.sleep(3_000);
+ tableResult.getJobClient().get().cancel().get();
+ }
+
+ @Test
+ public void testMultiKeys() throws Exception {
+ int parallelism = 1;
+ String[] captureCustomerTables = new String[] {"orders_with_multi_pks"};
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+ String sourceDDL =
+ format(
+ "CREATE TABLE orders_with_multi_pks ("
+ + " id BIGINT NOT NULL,"
+ + " seller_id STRING,"
+ + " order_id STRING,"
+ + " buyer_id STRING,"
+ + " create_time TIMESTAMP,"
+ + " primary key (id,order_id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mysql-cdc',"
+ + " 'scan.incremental.snapshot.enabled' = 'true',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '100',"
+ + " 'server-time-zone' = 'Asia/Shanghai',"
+ + " 'server-id' = '%s'"
+ + ")",
+ getHost(),
+ getPort(),
+ USER_NAME,
+ PASSWORD,
+ DATABASE_NAME,
+ getTableNameRegex(captureCustomerTables),
+ getServerId());
+
+ // first step: check the snapshot data
+ String[] snapshotForSingleTable =
+ new String[] {
+ "+I[1, 1001, 1, 102, 2022-01-16T00:00]",
+ "+I[2, 1002, 2, 105, 2022-01-16T00:00]",
+ "+I[3, 1004, 3, 109, 2022-01-16T00:00]",
+ "+I[4, 1002, 2, 106, 2022-01-16T00:00]",
+ "+I[5, 1003, 1, 107, 2022-01-16T00:00]",
+ };
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult = tEnv.executeSql("select * from orders_with_multi_pks");
+ CloseableIterator iterator = tableResult.collect();
+ List expectedSnapshotData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerTables.length; i++) {
+ expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
+ }
+
+ List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size());
+ assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData);
+
+ // second step: check the sink data
+ tEnv.executeSql(
+ "CREATE TABLE multi_key_sink ("
+ + " id BIGINT NOT NULL,"
+ + " seller_id STRING,"
+ + " order_id STRING,"
+ + " buyer_id STRING,"
+ + " create_time TIMESTAMP,"
+ + " primary key (id,order_id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ")");
+
+ tEnv.executeSql("insert into multi_key_sink select * from orders_with_multi_pks");
+
+ waitForSinkSize("multi_key_sink", realSnapshotData.size());
+ assertEqualsInAnyOrder(
+ expectedSnapshotData,
+ TestValuesTableFactory.getRawResultsAsStrings("multi_key_sink"));
+
+ // third step: check dml events
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("use " + DATABASE_NAME);
+ statement.execute(
+ "INSERT INTO orders_with_multi_pks VALUES (6, 1006,1006, 1006,'2022-01-17');");
+ statement.execute(
+ "INSERT INTO orders_with_multi_pks VALUES (7,1007, 1007,1007, '2022-01-17');");
+ statement.execute(
+ "UPDATE orders_with_multi_pks SET seller_id= 9999, order_id=9999 WHERE id=6;");
+ statement.execute(
+ "UPDATE orders_with_multi_pks SET seller_id= 9999, order_id=9999 WHERE id=7;");
+ statement.execute("DELETE FROM orders_with_multi_pks WHERE id=7;");
+ }
+
+ String[] expectedBinlog =
+ new String[] {
+ "+I[6, 1006, 1006, 1006, 2022-01-17T00:00]",
+ "+I[7, 1007, 1007, 1007, 2022-01-17T00:00]",
+ "-D[6, 1006, 1006, 1006, 2022-01-17T00:00]",
+ "+I[6, 9999, 9999, 1006, 2022-01-17T00:00]",
+ "-D[7, 1007, 1007, 1007, 2022-01-17T00:00]",
+ "+I[7, 9999, 9999, 1007, 2022-01-17T00:00]",
+ "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]"
+ };
+ List realBinlog = fetchRows(iterator, expectedBinlog.length);
+ assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog);
+ Thread.sleep(3_000);
+ tableResult.getJobClient().get().cancel().get();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
new file mode 100644
index 00000000000..4a93da7e1ec
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.oceanbase;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import io.debezium.connector.mysql.MySqlConnection;
+import org.apache.commons.lang3.StringUtils;
+import org.assertj.core.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Basic class for testing Database OceanBase. */
+@Testcontainers
+public abstract class OceanBaseSourceTestBase extends AbstractTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OceanBaseSourceTestBase.class);
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+
+ protected static final Integer INNER_PORT = 2883;
+ protected static final String USER_NAME = "root@test";
+ protected static final String PASSWORD = "123456";
+ protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(5);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ @SuppressWarnings("resource")
+ @Container
+ public static final GenericContainer> OB_BINLOG_CONTAINER =
+ new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+ .withNetwork(NETWORK)
+ .withStartupTimeout(WAITING_TIMEOUT)
+ .withExposedPorts(2881, 2883)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*OBBinlog is ready!.*")
+ .withTimes(1)
+ .withStartupTimeout(Duration.ofMinutes(6)));
+
+ protected static String getHost() {
+ return OB_BINLOG_CONTAINER.getHost();
+ }
+
+ protected static int getPort() {
+ return OB_BINLOG_CONTAINER.getMappedPort(INNER_PORT);
+ }
+
+ protected static String getUserName() {
+ return USER_NAME;
+ }
+
+ protected static String getPassword() {
+ return PASSWORD;
+ }
+
+ protected static String getJdbcUrl() {
+ return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
+ }
+
+ protected static Connection getJdbcConnection() throws SQLException {
+ String jdbcUrl = getJdbcUrl();
+ LOG.info("jdbcUrl is :" + jdbcUrl);
+ return DriverManager.getConnection(jdbcUrl, USER_NAME, PASSWORD);
+ }
+
+ /** initialize database and tables with ${databaseName}.sql for testing. */
+ protected static void initializeOceanBaseTables(
+ String ddlName, String dbName, Function filter)
+ throws InterruptedException {
+ final String ddlFile = String.format("ddl/%s.sql", ddlName);
+ final URL ddlTestFile = OceanBaseSourceTestBase.class.getClassLoader().getResource(ddlFile);
+ Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " + ddlFile).isNotNull();
+ // need to sleep 1s, make sure the jdbc connection can be created
+ Thread.sleep(1000);
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("drop database if exists " + dbName);
+ statement.execute("create database if not exists " + dbName);
+ statement.execute("use " + dbName + ";");
+ final List statements =
+ Arrays.stream(
+ Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .filter(sql -> filter == null || filter.apply(sql))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static void dropDatabase(String dbName) {
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("drop database if exists " + dbName);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static List fetchRows(Iterator iter, int size) {
+ List rows = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Row row = iter.next();
+ rows.add(row.toString());
+ }
+ return rows;
+ }
+
+ protected String getTableNameRegex(String[] captureCustomerTables) {
+ checkState(captureCustomerTables.length > 0);
+ if (captureCustomerTables.length == 1) {
+ return captureCustomerTables[0];
+ } else {
+ // pattern that matches multiple tables
+ return format("(%s)", StringUtils.join(captureCustomerTables, "|"));
+ }
+ }
+
+ protected String getServerId() {
+ final Random random = new Random();
+ int serverId = random.nextInt(100) + 5400;
+ return serverId + "-" + (serverId + 4);
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+
+ protected static void waitForSinkSize(String sinkName, int expectedSize)
+ throws InterruptedException {
+ while (sinkSize(sinkName) < expectedSize) {
+ Thread.sleep(100);
+ }
+ }
+
+ protected static int sinkSize(String sinkName) {
+ synchronized (TestValuesTableFactory.class) {
+ try {
+ return TestValuesTableFactory.getRawResults(sinkName).size();
+ } catch (IllegalArgumentException e) {
+ // job is not started yet
+ return 0;
+ }
+ }
+ }
+
+ protected static void assertEqualsInAnyOrder(List expected, List actual) {
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ protected static void assertEqualsInOrder(List expected, List actual) {
+ Assertions.assertThat(actual).containsExactlyElementsOf(expected);
+ }
+
+ protected static String getRandomSuffix() {
+ String base = UUID.randomUUID().toString().replaceAll("-", "");
+ if (base.length() > 10) {
+ return base.substring(0, 11);
+ }
+ return base;
+ }
+
+ /** The type of failover. */
+ protected enum FailoverType {
+ TM,
+ JM,
+ NONE
+ }
+
+ /** The phase of failover. */
+ protected enum FailoverPhase {
+ SNAPSHOT,
+ BINLOG,
+ NEVER
+ }
+
+ protected static void triggerFailover(
+ FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
+ throws Exception {
+ switch (type) {
+ case TM:
+ restartTaskManager(miniCluster, afterFailAction);
+ break;
+ case JM:
+ triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new IllegalStateException("Unexpected value: " + type);
+ }
+ }
+
+ protected static void triggerJobManagerFailover(
+ JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
+ final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
+ haLeadershipControl.revokeJobMasterLeadership(jobId).get();
+ afterFailAction.run();
+ haLeadershipControl.grantJobMasterLeadership(jobId).get();
+ }
+
+ protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
+ throws Exception {
+ miniCluster.terminateTaskManager(0).get();
+ afterFailAction.run();
+ miniCluster.startTaskManager();
+ }
+
+ protected MySqlConnection getConnection() {
+ Map properties = new HashMap<>();
+ properties.put("database.hostname", getHost());
+ properties.put("database.port", String.valueOf(getPort()));
+ properties.put("database.user", getUserName());
+ properties.put("database.password", getPassword());
+ properties.put("database.serverTimezone", ZoneId.of("Asia/Shanghai").toString());
+ io.debezium.config.Configuration configuration =
+ io.debezium.config.Configuration.from(properties);
+ return DebeziumUtils.createMySqlConnection(configuration, new Properties());
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/oceanbase_ddl_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/oceanbase_ddl_test.sql
new file mode 100644
index 00000000000..cc5c30b2fda
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/oceanbase_ddl_test.sql
@@ -0,0 +1,232 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: oceanbase_ddl_test
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- Create orders table with single primary key
+create table orders (
+ id bigint not null auto_increment,
+ seller_id varchar(30) DEFAULT NULL,
+ order_id varchar(30) DEFAULT NULL,
+ buyer_id varchar(30) DEFAULT NULL,
+ create_time datetime DEFAULT NULL,
+ primary key(id),
+ INDEX `i_seller`(`seller_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by key(id) partitions 3;
+
+-- insert some orders for testing
+INSERT INTO orders
+VALUES (1, 1001, 1, 102, '2022-01-16'),
+ (2, 1002, 2, 105, '2022-01-16'),
+ (3, 1004, 3, 109, '2022-01-16'),
+ (4, 1002, 2, 106, '2022-01-16'),
+ (5, 1003, 1, 107, '2022-01-16');
+
+-- Create orders with multi primary keys
+create table orders_with_multi_pks (
+ id bigint not null auto_increment,
+ seller_id varchar(30) DEFAULT NULL,
+ order_id varchar(30) NOT NULL,
+ buyer_id varchar(30) DEFAULT NULL,
+ create_time datetime DEFAULT NULL,
+ primary key(id, order_id),
+ INDEX `g_mi_seller`(`seller_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by key(id, order_id) partitions 3;
+
+-- insert some orders for testing
+INSERT INTO orders_with_multi_pks
+VALUES (1, 1001, 1, 102, '2022-01-16'),
+ (2, 1002, 2, 105, '2022-01-16'),
+ (3, 1004, 3, 109, '2022-01-16'),
+ (4, 1002, 2, 106, '2022-01-16'),
+ (5, 1003, 1, 107, '2022-01-16');
+
+
+-- create table with full types
+CREATE TABLE oceanbase_full_types (
+ id INT AUTO_INCREMENT,
+ tiny_c TINYINT,
+ tiny_un_c TINYINT UNSIGNED,
+ small_c SMALLINT,
+ small_un_c SMALLINT UNSIGNED,
+ medium_c MEDIUMINT,
+ medium_un_c MEDIUMINT UNSIGNED,
+ int_c INTEGER ,
+ int_un_c INTEGER UNSIGNED,
+ int11_c INT(11) DEFAULT 0,
+ big_c BIGINT,
+ big_un_c BIGINT UNSIGNED,
+ varchar_c VARCHAR(255) DEFAULT '1',
+ char_c CHAR(3) DEFAULT '',
+ real_c REAL,
+ float_c FLOAT,
+ double_c DOUBLE,
+ decimal_c DECIMAL(8, 4),
+ numeric_c NUMERIC(6, 0),
+ big_decimal_c DECIMAL(65, 1),
+ bit1_c BIT,
+ tiny1_c TINYINT(1),
+ boolean_c BOOLEAN,
+ date_c DATE,
+ time_c TIME(0),
+ datetime3_c DATETIME(3),
+ datetime6_c DATETIME(6),
+ timestamp_c TIMESTAMP,
+ file_uuid BINARY(16),
+ bit_c BIT(64),
+ text_c TEXT,
+ tiny_blob_c TINYBLOB,
+ blob_c BLOB,
+ medium_blob_c MEDIUMBLOB,
+ long_blob_c LONGBLOB,
+ year_c YEAR,
+ enum_c enum('red', 'white') default 'red',
+ set_c SET('a', 'b'),
+ json_c JSON,
+ point_c POINT,
+ geometry_c GEOMETRY,
+ linestring_c LINESTRING,
+ polygon_c POLYGON,
+ multipoint_c MULTIPOINT,
+ multiline_c MULTILINESTRING,
+ multipolygon_c MULTIPOLYGON,
+ geometrycollection_c GEOMETRYCOLLECTION,
+ PRIMARY KEY (id),
+ INDEX `g_mit_seller`(`int_c`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by Hash(id) partitions 3;
+
+INSERT INTO oceanbase_full_types VALUES (
+ DEFAULT, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807,
+ 18446744073709551615,
+ 'Hello World', 'abc', 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, 0, 1, true,
+ '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',
+ unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')), b'0000010000000100000001000000010000000100000001000000010000000100',
+ 'text',UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)), 2021,
+ 'red', 'a,b,a', '{"key1": "value1", "key2": "value2", "num1": 1.6708304E7, "num2": 16708305}',
+ ST_GeomFromText('POINT(1 1)'),
+ ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
+ ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'),
+ ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
+ ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
+ ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
+ ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),
+ ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))')
+);
+
+-- Create orders_sink for testing the sink of flink-jdbc-connector
+create table orders_sink (
+ id bigint not null auto_increment,
+ seller_id varchar(30) DEFAULT NULL,
+ order_id varchar(30) DEFAULT NULL,
+ buyer_id varchar(30) DEFAULT NULL,
+ create_time datetime DEFAULT NULL,
+ primary key(id)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by hash(id) partitions 3;
+
+
+-- Create and populate our users using a single insert with many rows
+CREATE TABLE customers (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO customers
+VALUES (101,"user_1","Shanghai","123567891234"),
+ (102,"user_2","Shanghai","123567891234"),
+ (103,"user_3","Shanghai","123567891234"),
+ (109,"user_4","Shanghai","123567891234"),
+ (110,"user_5","Shanghai","123567891234"),
+ (111,"user_6","Shanghai","123567891234"),
+ (118,"user_7","Shanghai","123567891234"),
+ (121,"user_8","Shanghai","123567891234"),
+ (123,"user_9","Shanghai","123567891234"),
+ (1009,"user_10","Shanghai","123567891234"),
+ (1010,"user_11","Shanghai","123567891234"),
+ (1011,"user_12","Shanghai","123567891234"),
+ (1012,"user_13","Shanghai","123567891234"),
+ (1013,"user_14","Shanghai","123567891234"),
+ (1014,"user_15","Shanghai","123567891234"),
+ (1015,"user_16","Shanghai","123567891234"),
+ (1016,"user_17","Shanghai","123567891234"),
+ (1017,"user_18","Shanghai","123567891234"),
+ (1018,"user_19","Shanghai","123567891234"),
+ (1019,"user_20","Shanghai","123567891234"),
+ (2000,"user_21","Shanghai","123567891234");
+
+-- table has same name prefix with 'customers.*'
+CREATE TABLE customers_1 (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO customers_1
+VALUES (101,"user_1","Shanghai","123567891234"),
+ (102,"user_2","Shanghai","123567891234"),
+ (103,"user_3","Shanghai","123567891234"),
+ (109,"user_4","Shanghai","123567891234"),
+ (110,"user_5","Shanghai","123567891234"),
+ (111,"user_6","Shanghai","123567891234"),
+ (118,"user_7","Shanghai","123567891234"),
+ (121,"user_8","Shanghai","123567891234"),
+ (123,"user_9","Shanghai","123567891234"),
+ (1009,"user_10","Shanghai","123567891234"),
+ (1010,"user_11","Shanghai","123567891234"),
+ (1011,"user_12","Shanghai","123567891234"),
+ (1012,"user_13","Shanghai","123567891234"),
+ (1013,"user_14","Shanghai","123567891234"),
+ (1014,"user_15","Shanghai","123567891234"),
+ (1015,"user_16","Shanghai","123567891234"),
+ (1016,"user_17","Shanghai","123567891234"),
+ (1017,"user_18","Shanghai","123567891234"),
+ (1018,"user_19","Shanghai","123567891234"),
+ (1019,"user_20","Shanghai","123567891234"),
+ (2000,"user_21","Shanghai","123567891234");
+
+
+CREATE TABLE customers_no_pk (
+ id INTEGER,
+ name VARCHAR(255) DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO customers_no_pk
+VALUES (101,"user_1","Shanghai","123567891234"),
+ (102,"user_2","Shanghai","123567891234"),
+ (103,"user_3","Shanghai","123567891234"),
+ (109,"user_4","Shanghai","123567891234"),
+ (110,"user_5","Shanghai","123567891234"),
+ (111,"user_6","Shanghai","123567891234"),
+ (118,"user_7","Shanghai","123567891234"),
+ (121,"user_8","Shanghai","123567891234"),
+ (123,"user_9","Shanghai","123567891234"),
+ (1009,"user_10","Shanghai","123567891234"),
+ (1010,"user_11","Shanghai","123567891234"),
+ (1011,"user_12","Shanghai","123567891234"),
+ (1012,"user_13","Shanghai","123567891234"),
+ (1013,"user_14","Shanghai","123567891234"),
+ (1014,"user_15","Shanghai","123567891234"),
+ (1015,"user_16","Shanghai","123567891234"),
+ (1016,"user_17","Shanghai","123567891234"),
+ (1017,"user_18","Shanghai","123567891234"),
+ (1018,"user_19","Shanghai","123567891234"),
+ (1019,"user_20","Shanghai","123567891234"),
+ (2000,"user_21","Shanghai","123567891234");
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
index 5e1e979891e..9474a225ff8 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
@@ -29,7 +29,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -40,8 +39,6 @@
import java.util.stream.Stream;
/** Test supporting different column charsets for OceanBase. */
-@Disabled(
- "Temporarily disabled for GitHub CI due to unavailability of OceanBase Binlog Service docker image. These tests are currently only supported for local execution.")
public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase {
private static final String DDL_FILE = "charset_test";
@@ -183,7 +180,7 @@ public void testCharset(String testName, String[] snapshotExpected, String[] bin
+ ")",
testName,
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
index aaad93e1ca9..f722813e78d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
@@ -22,7 +22,6 @@
import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -35,7 +34,6 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
@@ -45,28 +43,29 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
/** failover IT tests for oceanbase. */
-@Disabled(
- "Temporarily disabled for GitHub CI due to unavailability of OceanBase Binlog Service docker image. These tests are currently only supported for local execution.")
@Timeout(value = 180, unit = TimeUnit.SECONDS)
public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
private static final String DDL_FILE = "oceanbase_ddl_test";
- private static final String DEFAULT_TEST_DATABASE = "customer_" + getRandomSuffix();
protected static final int DEFAULT_PARALLELISM = 4;
+ private String testDatabase = "customer_" + getRandomSuffix();
private final List firstPartBinlogEvents =
Arrays.asList(
@@ -87,9 +86,9 @@ public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
public static Stream parameters() {
return Stream.of(
- Arguments.of("customers", null),
- Arguments.of("customers", "id"),
- Arguments.of("customers_no_pk", "id"));
+ Arguments.of("customers", null, "false"),
+ Arguments.of("customers", "id", "true"),
+ Arguments.of("customers_no_pk", "id", "true"));
}
@RegisterExtension
@@ -105,47 +104,53 @@ public static Stream parameters() {
@BeforeEach
public void setup() throws InterruptedException {
+ testDatabase = "customer_" + getRandomSuffix();
initializeOceanBaseTables(
DDL_FILE,
- DEFAULT_TEST_DATABASE,
+ testDatabase,
s -> !StringUtils.isNullOrWhitespaceOnly(s) && (s.contains("customers")));
}
@AfterEach
public void clean() {
- dropDatabase(DEFAULT_TEST_DATABASE);
+ dropDatabase(testDatabase);
}
// Failover tests
@ParameterizedTest
@MethodSource("parameters")
- @Timeout(value = 120, unit = TimeUnit.SECONDS)
- public void testTaskManagerFailoverInSnapshotPhase(String tableName, String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverInSnapshotPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testTaskManagerFailoverInBinlogPhase(String tableName, String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverInBinlogPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.TM,
FailoverPhase.BINLOG,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testTaskManagerFailoverFromLatestOffset(String tableName, String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverFromLatestOffset(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
"latest-offset",
@@ -154,37 +159,46 @@ public void testTaskManagerFailoverFromLatestOffset(String tableName, String chu
new String[] {tableName, "customers_1"},
RestartStrategies.fixedDelayRestart(1, 0),
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverInSnapshotPhase(String tableName, String chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverInSnapshotPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.JM,
FailoverPhase.SNAPSHOT,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverInBinlogPhase(String tableName, String chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverInBinlogPhase(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.JM,
FailoverPhase.BINLOG,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverFromLatestOffset(String tableName, String chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverFromLatestOffset(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
"latest-offset",
@@ -193,33 +207,42 @@ public void testJobManagerFailoverFromLatestOffset(String tableName, String chun
new String[] {tableName, "customers_1"},
RestartStrategies.fixedDelayRestart(1, 0),
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testTaskManagerFailoverSingleParallelism(String tableName, String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverSingleParallelism(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
1,
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {tableName},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverSingleParallelism(String tableName, String chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverSingleParallelism(
+ String tableName, String chunkColumnName, String assignEndingFirst) throws Exception {
testMySqlParallelSource(
1,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
new String[] {tableName},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+ "scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
private void testMySqlParallelSource(
@@ -227,7 +250,8 @@ private void testMySqlParallelSource(
FailoverPhase failoverPhase,
String[] captureCustomerTables,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map otherOptions)
throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
@@ -235,7 +259,8 @@ private void testMySqlParallelSource(
failoverPhase,
captureCustomerTables,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ otherOptions);
}
private void testMySqlParallelSource(
@@ -244,7 +269,8 @@ private void testMySqlParallelSource(
FailoverPhase failoverPhase,
String[] captureCustomerTables,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map otherOptions)
throws Exception {
testMySqlParallelSource(
parallelism,
@@ -254,7 +280,8 @@ private void testMySqlParallelSource(
captureCustomerTables,
RestartStrategies.fixedDelayRestart(1, 0),
tableName,
- chunkColumnName);
+ chunkColumnName,
+ otherOptions);
}
private void testMySqlParallelSource(
@@ -265,7 +292,8 @@ private void testMySqlParallelSource(
String[] captureCustomerTables,
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map otherOptions)
throws Exception {
testMySqlParallelSource(
parallelism,
@@ -276,7 +304,8 @@ private void testMySqlParallelSource(
restartStrategyConfiguration,
false,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ otherOptions);
}
private void testMySqlParallelSource(
@@ -288,11 +317,10 @@ private void testMySqlParallelSource(
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
boolean skipSnapshotBackfill,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map otherOptions)
throws Exception {
- captureCustomerTables = new String[] {tableName};
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
@@ -309,7 +337,7 @@ private void testMySqlParallelSource(
? ""
: ", primary key (id) not enforced")
+ ") WITH ("
- + " 'connector' = 'oceanbase-cdc',"
+ + " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
@@ -323,12 +351,13 @@ private void testMySqlParallelSource(
+ " 'server-time-zone' = 'Asia/Shanghai',"
+ " 'server-id' = '%s'"
+ " %s"
+ + " %s"
+ ")",
getHost(),
getPort(),
getUserName(),
getPassword(),
- DEFAULT_TEST_DATABASE,
+ testDatabase,
getTableNameRegex(captureCustomerTables),
scanStartupMode,
skipSnapshotBackfill,
@@ -337,7 +366,17 @@ private void testMySqlParallelSource(
? ""
: String.format(
", 'scan.incremental.snapshot.chunk.key-column' = '%s'",
- chunkColumnName));
+ chunkColumnName),
+ otherOptions.isEmpty()
+ ? ""
+ : ","
+ + otherOptions.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+ "'%s'='%s'",
+ e.getKey(), e.getValue()))
+ .collect(Collectors.joining(",")));
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
@@ -349,7 +388,7 @@ private void testMySqlParallelSource(
// second step: check the binlog data
checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables);
- // sleepMs(3000);
+ sleepMs(3000);
tableResult.getJobClient().get().cancel().get();
}
@@ -416,7 +455,7 @@ private void checkBinlogData(
JobID jobId = tableResult.getJobClient().get().getJobID();
for (String tableId : captureCustomerTables) {
- makeFirstPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId);
+ makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
}
// wait for the binlog reading
@@ -431,7 +470,7 @@ private void checkBinlogData(
waitUntilJobRunning(tableResult);
}
for (String tableId : captureCustomerTables) {
- makeSecondPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId);
+ makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
}
List expectedBinlogData = new ArrayList<>();
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
index 75b6726ff09..702a9da4dbc 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
@@ -27,7 +27,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.sql.Connection;
@@ -39,8 +38,6 @@
import static java.lang.String.format;
/** OceanBase CDC source connector integration test. */
-@Disabled(
- "Temporarily disabled for GitHub CI due to unavailability of OceanBase Binlog Service docker image. These tests are currently only supported for local execution.")
public class OceanBaseSourceITCase extends OceanBaseSourceTestBase {
private static final String DDL_FILE = "oceanbase_ddl_test";
private static final String DATABASE_NAME = "cdc_s_" + getRandomSuffix();
@@ -89,7 +86,7 @@ public void testSingleKey() throws Exception {
+ " 'server-id' = '%s'"
+ ")",
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
@@ -241,7 +238,7 @@ public void testFullTypesDdl() throws Exception {
+ " 'server-id' = '%s'"
+ ")",
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
@@ -295,7 +292,7 @@ public void testMultiKeys() throws Exception {
+ " 'server-id' = '%s'"
+ ")",
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
index 0ea3cb9662d..703e0a5234d 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
@@ -30,6 +30,12 @@
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import java.net.URL;
import java.nio.file.Files;
@@ -38,6 +44,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -56,22 +63,40 @@
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
-/** Basic class for testing Database OceanBase which supported the mysql protocol. */
+/** Basic class for testing Database OceanBase. */
+@Testcontainers
public abstract class OceanBaseSourceTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(OceanBaseSourceTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
- protected static final Integer PORT = 3306;
- protected static final String USER_NAME = System.getenv("OCEANBASE_USERNAME");
- protected static final String PASSWORD = System.getenv("OCEANBASE_PASSWORD");
- protected static final String HOSTNAME = System.getenv("OCEANBASE_HOSTNAME");
+
+ protected static final Integer INNER_PORT = 2883;
+ protected static final String USER_NAME = "root@test";
+ protected static final String PASSWORD = "123456";
+ protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(5);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ @SuppressWarnings("resource")
+ @Container
+ public static final GenericContainer> OB_BINLOG_CONTAINER =
+ new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+ .withNetwork(NETWORK)
+ .withStartupTimeout(WAITING_TIMEOUT)
+ .withExposedPorts(2881, 2883)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*OBBinlog is ready!.*")
+ .withTimes(1)
+ .withStartupTimeout(Duration.ofMinutes(6)));
protected static String getHost() {
- return HOSTNAME;
+ return OB_BINLOG_CONTAINER.getHost();
}
- protected static Integer getPort() {
- return PORT;
+ protected static int getPort() {
+ return OB_BINLOG_CONTAINER.getMappedPort(INNER_PORT);
}
protected static String getUserName() {
@@ -83,7 +108,7 @@ protected static String getPassword() {
}
protected static String getJdbcUrl() {
- return String.format("jdbc:mysql://%s:%s", HOSTNAME, PORT);
+ return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
}
protected static Connection getJdbcConnection() throws SQLException {
@@ -253,7 +278,7 @@ protected MySqlConnection getConnection() {
properties.put("database.port", String.valueOf(getPort()));
properties.put("database.user", getUserName());
properties.put("database.password", getPassword());
- properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
+ properties.put("database.serverTimezone", ZoneId.of("Asia/Shanghai").toString());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index f97841016d1..57c7d385be9 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -340,6 +340,16 @@ limitations under the License.
+
+ org.apache.flink
+ flink-sql-connector-oceanbase-cdc
+ ${project.version}
+ oceanbase-cdc-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
com.ibm.db2.jcc
db2jcc
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
new file mode 100644
index 00000000000..bb0cb26f237
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tests;
+
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished;
+
+/** End-to-end tests for oceanbase-cdc connector uber jar. */
+@Testcontainers
+class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class);
+
+ private static final Path oceanbaseCdcJar =
+ TestUtils.getResource("oceanbase-cdc-connector.jar");
+ private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
+ private static final String[] CREATE_DATABASE_DDL =
+ new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
+ private static final Path jdbcDriver = TestUtils.getResource("mysql-driver.jar");
+ private static final String DATABASE_NAME = "oceanbase_inventory";
+
+ private static final int OB_PROXY_PORT = 2883;
+ protected static final String DEFAULT_USERNAME = "root@test";
+ protected static final String DEFAULT_PASSWORD = "123456";
+ protected static final String INTER_CONTAINER_OCEANBASE_ALIAS = "oceanbase";
+
+ @Container
+ @SuppressWarnings("resource")
+ public static final GenericContainer> OB_BINLOG_CONTAINER =
+ new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_OCEANBASE_ALIAS)
+ .withStartupTimeout(Duration.ofMinutes(5))
+ .withExposedPorts(2881, 2883)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*OBBinlog is ready!.*")
+ .withTimes(1)
+ .withStartupTimeout(Duration.ofMinutes(6)));
+
+ @BeforeEach
+ public void before() {
+ super.before();
+ createAndInitializeOBTable("oceanbase_inventory");
+ }
+
+ @AfterEach
+ public void after() {
+ super.after();
+ }
+
+ @Test
+ void testOBBinlogCDC() throws Exception {
+ List sqlLines =
+ Arrays.asList(
+ "SET 'execution.checkpointing.interval' = '3s';",
+ "SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';",
+ "CREATE TABLE products_source (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " enum_c STRING,",
+ " json_c STRING,",
+ " point_c STRING,",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'oceanbase-cdc',",
+ " 'hostname' = '" + INTER_CONTAINER_OCEANBASE_ALIAS + "',",
+ " 'port' = '2883',",
+ " 'username' = '" + DEFAULT_USERNAME + "',",
+ " 'password' = '" + DEFAULT_PASSWORD + "',",
+ " 'database-name' = '" + DATABASE_NAME + "',",
+ " 'table-name' = 'products_source',",
+ " 'server-time-zone' = 'Asia/Shanghai',",
+ " 'server-id' = '5800-5900',",
+ " 'scan.incremental.snapshot.chunk.size' = '4',",
+ " 'scan.incremental.close-idle-reader.enabled' = '"
+ + supportCheckpointsAfterTasksFinished()
+ + "'",
+ ");",
+ "CREATE TABLE products_sink (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " enum_c STRING,",
+ " json_c STRING,",
+ " point_c STRING,",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'jdbc',",
+ String.format(
+ " 'url' = 'jdbc:mysql://%s:2883/%s',",
+ INTER_CONTAINER_OCEANBASE_ALIAS, DATABASE_NAME),
+ " 'table-name' = 'products_sink',",
+ " 'username' = '" + DEFAULT_USERNAME + "',",
+ " 'password' = '" + DEFAULT_PASSWORD + "'",
+ ");",
+ "INSERT INTO products_sink",
+ "SELECT * FROM products_source;");
+
+ submitSQLJob(sqlLines, oceanbaseCdcJar, jdbcJar, jdbcDriver);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+
+ // generate binlogs
+ String jdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ OB_BINLOG_CONTAINER.getHost(),
+ OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT),
+ DATABASE_NAME);
+ try (Connection conn =
+ DriverManager.getConnection(jdbcUrl, DEFAULT_USERNAME, DEFAULT_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute(
+ "UPDATE products_source SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products_source SET weight='5.1' WHERE id=107;");
+ stat.execute(
+ "INSERT INTO products_source VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110
+ stat.execute(
+ "INSERT INTO products_source VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+ stat.execute(
+ "UPDATE products_source SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ stat.execute("UPDATE products_source SET weight='5.17' WHERE id=111;");
+ stat.execute("DELETE FROM products_source WHERE id=111;");
+ // add schema change event in the last.
+ stat.execute("CREATE TABLE new_table (id int, age int);");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ // assert final results
+ JdbcProxy proxy =
+ new JdbcProxy(jdbcUrl, DEFAULT_USERNAME, DEFAULT_PASSWORD, MYSQL_DRIVER_CLASS);
+ List expectResult =
+ Arrays.asList(
+ "101,scooter,Small 2-wheel scooter,3.14,red,{\"key1\": \"value1\"},{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+ "102,car battery,12V car battery,8.1,white,{\"key2\": \"value2\"},{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+ "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8,red,{\"key3\": \"value3\"},{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+ "104,hammer,12oz carpenter's hammer,0.75,white,{\"key4\": \"value4\"},{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+ "105,hammer,14oz carpenter's hammer,0.875,red,{\"k1\": \"v1\", \"k2\": \"v2\"},{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+ "106,hammer,18oz carpenter hammer,1.0,null,null,null",
+ "107,rocks,box of assorted rocks,5.1,null,null,null",
+ "108,jacket,water resistent black wind breaker,0.1,null,null,null",
+ "109,spare tire,24 inch spare tire,22.2,null,null,null",
+ "110,jacket,new water resistent white wind breaker,0.5,null,null,null");
+ proxy.checkResultWithTimeout(
+ expectResult,
+ "products_sink",
+ new String[] {"id", "name", "description", "weight", "enum_c", "json_c", "point_c"},
+ 300000L);
+ }
+
+ /** Creates the database and populates it with initialization SQL script. */
+ public void createAndInitializeOBTable(String sqlFile) {
+ final String ddlFile = String.format("ddl/%s.sql", sqlFile);
+ final URL ddlTestFile = OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
+ Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " + ddlFile).isNotNull();
+ try {
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ final List statements =
+ Arrays.stream(
+ Stream.concat(
+ Arrays.stream(CREATE_DATABASE_DDL),
+ Files.readAllLines(
+ Paths.get(ddlTestFile.toURI()))
+ .stream())
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .map(sql -> sql.replace("$DBNAME$", DATABASE_NAME))
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .map(x -> x.replace("$$", ";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Connection getJdbcConnection() throws SQLException {
+ String url =
+ "jdbc:mysql://"
+ + OB_BINLOG_CONTAINER.getHost()
+ + ":"
+ + OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT)
+ + "?useSSL=false";
+ return DriverManager.getConnection(url, DEFAULT_USERNAME, DEFAULT_PASSWORD);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
new file mode 100644
index 00000000000..73bb75d7f2a
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
@@ -0,0 +1,51 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products_source (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c enum('red', 'white') default 'red', -- test some complex types as well,
+ json_c JSON, -- because we use additional dependencies to deserialize complex types.
+ point_c POINT
+);
+ALTER TABLE products_source AUTO_INCREMENT = 101;
+
+INSERT INTO products_source
+VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')),
+ (default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')),
+ (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')),
+ (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')),
+ (default,"hammer","16oz carpenter's hammer",1.0, null, null, null),
+ (default,"rocks","box of assorted rocks",5.3, null, null, null),
+ (default,"jacket","water resistent black wind breaker",0.1, null, null, null),
+ (default,"spare tire","24 inch spare tire",22.2, null, null, null);
+
+CREATE TABLE products_sink (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c VARCHAR(255),
+ json_c VARCHAR(255),
+ point_c VARCHAR(255)
+);
\ No newline at end of file