diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md b/docs/content.zh/docs/connectors/flink-sources/overview.md index bdc1a23286a..1c6661eb2ac 100644 --- a/docs/content.zh/docs/connectors/flink-sources/overview.md +++ b/docs/content.zh/docs/connectors/flink-sources/overview.md @@ -82,7 +82,7 @@ The following table shows the current features of the connector: | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | -| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ❌ | ❌ | ❌ | ❌ | +| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}}) | ✅ | ❌ | ✅ | ❌ | | [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}}) | ✅ | ❌ | ✅ | ❌ | diff --git a/docs/content/docs/connectors/flink-sources/overview.md b/docs/content/docs/connectors/flink-sources/overview.md index c275d54360c..4369e9c6391 100644 --- a/docs/content/docs/connectors/flink-sources/overview.md +++ b/docs/content/docs/connectors/flink-sources/overview.md @@ -75,14 +75,14 @@ The following table shows the version mapping between Flink® CDC Con The following table shows the current features of the connector: -| Connector | No-lock Read | Parallel Read | Exactly-once Read | Incremental Snapshot Read | -|---------------------------------------------------------------------------------------|--------------|---------------|-------------------|---------------------------| +| Connector | No-lock Read | Parallel Read | Exactly-once Read | Incremental Snapshot Read | +|----------------------------------------------------------------------------|--------------|---------------|-------------------|---------------------------| | [mongodb-cdc]({{< ref "docs/connectors/flink-sources/mongodb-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [mysql-cdc]({{< ref "docs/connectors/flink-sources/mysql-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | -| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ❌ | ❌ | ❌ | ❌ | +| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}}) | ✅ | ❌ | ✅ | ❌ | | [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}}) | ✅ | ✅ | ✅ | ✅ | | [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}}) | ✅ | ❌ | ✅ | ❌ | diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml index 96366a9af91..2453d10df02 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml @@ -169,6 +169,13 @@ limitations under the License. + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + org.testcontainers mysql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java new file mode 100644 index 00000000000..2e43a315c1d --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.Arrays; +import java.util.stream.Stream; + +/** Test supporting different column charsets for OceanBase. */ +public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase { + + private static final String DDL_FILE = "charset_test"; + private static final String DATABASE_NAME = "cdc_c_" + getRandomSuffix(); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + @BeforeAll + public static void beforeClass() throws InterruptedException { + initializeOceanBaseTables( + DDL_FILE, + DATABASE_NAME, + s -> // see: + // https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002017544 + !StringUtils.isNullOrWhitespaceOnly(s) + && (s.contains("utf8_test") + || s.contains("latin1_test") + || s.contains("gbk_test") + || s.contains("big5_test") + || s.contains("ascii_test") + || s.contains("sjis_test"))); + } + + @AfterAll + public static void after() { + dropDatabase(DATABASE_NAME); + } + + @BeforeEach + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(4); + env.enableCheckpointing(200); + } + + public static Stream parameters() { + return Stream.of( + Arguments.of( + "utf8_test", + new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}, + new String[] { + "-D[1, 测试数据]", + "-D[2, Craig Marshall]", + "-D[3, 另一个测试数据]", + "+I[11, 测试数据]", + "+I[12, Craig Marshall]", + "+I[13, 另一个测试数据]" + }), + Arguments.of( + "ascii_test", + new String[] { + "+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]" + }, + new String[] { + "-D[1, ascii test!?]", + "-D[2, Craig Marshall]", + "-D[3, {test}]", + "+I[11, ascii test!?]", + "+I[12, Craig Marshall]", + "+I[13, {test}]" + }), + Arguments.of( + "gbk_test", + new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"}, + new String[] { + "-D[1, 测试数据]", + "-D[2, Craig Marshall]", + "-D[3, 另一个测试数据]", + "+I[11, 测试数据]", + "+I[12, Craig Marshall]", + "+I[13, 另一个测试数据]" + }), + Arguments.of( + "latin1_test", + new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"}, + new String[] { + "-D[1, ÀÆÉ]", + "-D[2, Craig Marshall]", + "-D[3, Üæû]", + "+I[11, ÀÆÉ]", + "+I[12, Craig Marshall]", + "+I[13, Üæû]" + }), + Arguments.of( + "big5_test", + new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"}, + new String[] { + "-D[1, 大五]", + "-D[2, Craig Marshall]", + "-D[3, 丹店]", + "+I[11, 大五]", + "+I[12, Craig Marshall]", + "+I[13, 丹店]" + }), + Arguments.of( + "sjis_test", + new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"}, + new String[] { + "-D[1, ひびぴ]", + "-D[2, Craig Marshall]", + "-D[3, フブプ]", + "+I[11, ひびぴ]", + "+I[12, Craig Marshall]", + "+I[13, フブプ]" + })); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testCharset(String testName, String[] snapshotExpected, String[] binlogExpected) + throws Exception { + String sourceDDL = + String.format( + "CREATE TABLE %s (\n" + + " table_id BIGINT,\n" + + " table_name STRING,\n" + + " primary key(table_id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'server-time-zone' = 'Asia/Shanghai'," + + " 'jdbc.properties.connectTimeout' = '6000000000'," + + " 'jdbc.properties.socketTimeout' = '6000000000'," + + " 'jdbc.properties.autoReconnect' = 'true'," + + " 'jdbc.properties.failOverReadOnly' = 'false'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + testName, + getHost(), + getPort(), + USER_NAME, + PASSWORD, + DATABASE_NAME, + testName, + true, + getServerId(), + 4); + tEnv.executeSql(sourceDDL); + // async submit job + TableResult result = + tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", testName)); + + // test snapshot phase + CloseableIterator iterator = result.collect(); + waitForSnapshotStarted(iterator); + assertEqualsInAnyOrder( + Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length)); + + // test binlog phase + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "/*TDDL:FORBID_EXECUTE_DML_ALL=FALSE*/UPDATE %s.%s SET table_id = table_id + 10;", + DATABASE_NAME, testName)); + } + assertEqualsInAnyOrder( + Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length)); + result.getJobClient().get().cancel().get(); + + // Sleep to avoid the issue: The last packet successfully received from the server was 35 + // milliseconds ago. + Thread.sleep(1_000); + } + + private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { + while (!iterator.hasNext()) { + Thread.sleep(100); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java new file mode 100644 index 00000000000..e6e8251191e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.StringUtils; + +import io.debezium.jdbc.JdbcConnection; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.apache.flink.api.common.JobStatus.RUNNING; + +/** failover IT tests for oceanbase. */ +@Timeout(value = 180, unit = TimeUnit.SECONDS) +@Disabled( + "The current version of the Oceanbase binlog service causes failover test cases to fail. Disable the test and wait for the binlog version to be updated.") +public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase { + + private static final String DEFAULT_SCAN_STARTUP_MODE = "initial"; + private static final String DDL_FILE = "oceanbase_ddl_test"; + protected static final int DEFAULT_PARALLELISM = 4; + private String testDatabase = "customer_" + getRandomSuffix(); + + private final List firstPartBinlogEvents = + Arrays.asList( + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]", + "-D[102, user_2, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]"); + + private final List secondPartBinlogEvents = + Arrays.asList( + "-U[1010, user_11, Shanghai, 123567891234]", + "+I[2001, user_22, Shanghai, 123567891234]", + "+I[2002, user_23, Shanghai, 123567891234]", + "+I[2003, user_24, Shanghai, 123567891234]", + "+U[1010, user_11, Hangzhou, 123567891234]"); + + public static Stream parameters() { + return Stream.of( + Arguments.of("customers", null, "false"), + Arguments.of("customers", "id", "true"), + Arguments.of("customers_no_pk", "id", "true")); + } + + @RegisterExtension + public final ExternalResourceProxy miniClusterResource = + new ExternalResourceProxy<>( + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) + .withHaLeadershipControl() + .build())); + + @BeforeEach + public void setup() throws InterruptedException { + testDatabase = "customer_" + getRandomSuffix(); + initializeOceanBaseTables( + DDL_FILE, + testDatabase, + s -> !StringUtils.isNullOrWhitespaceOnly(s) && (s.contains("customers"))); + } + + @AfterEach + public void clean() { + dropDatabase(testDatabase); + } + + // Failover tests + @ParameterizedTest + @MethodSource("parameters") + public void testTaskManagerFailoverInSnapshotPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + FailoverType.TM, + FailoverPhase.SNAPSHOT, + new String[] {tableName, "customers_1"}, + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testTaskManagerFailoverInBinlogPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + FailoverType.TM, + FailoverPhase.BINLOG, + new String[] {tableName, "customers_1"}, + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testTaskManagerFailoverFromLatestOffset( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + DEFAULT_PARALLELISM, + "latest-offset", + FailoverType.TM, + FailoverPhase.BINLOG, + new String[] {tableName, "customers_1"}, + RestartStrategies.fixedDelayRestart(1, 0), + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testJobManagerFailoverInSnapshotPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + FailoverType.JM, + FailoverPhase.SNAPSHOT, + new String[] {tableName, "customers_1"}, + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testJobManagerFailoverInBinlogPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + FailoverType.JM, + FailoverPhase.BINLOG, + new String[] {tableName, "customers_1"}, + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testJobManagerFailoverFromLatestOffset( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + DEFAULT_PARALLELISM, + "latest-offset", + FailoverType.JM, + FailoverPhase.BINLOG, + new String[] {tableName, "customers_1"}, + RestartStrategies.fixedDelayRestart(1, 0), + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testTaskManagerFailoverSingleParallelism( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + 1, + FailoverType.TM, + FailoverPhase.SNAPSHOT, + new String[] {tableName}, + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + @ParameterizedTest + @MethodSource("parameters") + public void testJobManagerFailoverSingleParallelism( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { + testMySqlParallelSource( + 1, + FailoverType.JM, + FailoverPhase.SNAPSHOT, + new String[] {tableName}, + tableName, + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); + } + + private void testMySqlParallelSource( + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables, + String tableName, + String chunkColumnName, + Map otherOptions) + throws Exception { + testMySqlParallelSource( + DEFAULT_PARALLELISM, + failoverType, + failoverPhase, + captureCustomerTables, + tableName, + chunkColumnName, + otherOptions); + } + + private void testMySqlParallelSource( + int parallelism, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables, + String tableName, + String chunkColumnName, + Map otherOptions) + throws Exception { + testMySqlParallelSource( + parallelism, + DEFAULT_SCAN_STARTUP_MODE, + failoverType, + failoverPhase, + captureCustomerTables, + RestartStrategies.fixedDelayRestart(1, 0), + tableName, + chunkColumnName, + otherOptions); + } + + private void testMySqlParallelSource( + int parallelism, + String scanStartupMode, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables, + RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, + String tableName, + String chunkColumnName, + Map otherOptions) + throws Exception { + testMySqlParallelSource( + parallelism, + scanStartupMode, + failoverType, + failoverPhase, + captureCustomerTables, + restartStrategyConfiguration, + false, + tableName, + chunkColumnName, + otherOptions); + } + + private void testMySqlParallelSource( + int parallelism, + String scanStartupMode, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables, + RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, + boolean skipSnapshotBackfill, + String tableName, + String chunkColumnName, + Map otherOptions) + throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(restartStrategyConfiguration); + String sourceDDL = + format( + "CREATE TABLE customers (" + + " id BIGINT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING" + + ("customers_no_pk".equals(tableName) + ? "" + : ", primary key (id) not enforced") + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'scan.incremental.snapshot.backfill.skip' = '%s'," + + " 'server-time-zone' = 'Asia/Shanghai'," + + " 'server-id' = '%s'" + + " %s" + + " %s" + + ")", + getHost(), + getPort(), + getUserName(), + getPassword(), + testDatabase, + getTableNameRegex(captureCustomerTables), + scanStartupMode, + skipSnapshotBackfill, + getServerId(), + chunkColumnName == null + ? "" + : String.format( + ", 'scan.incremental.snapshot.chunk.key-column' = '%s'", + chunkColumnName), + otherOptions.isEmpty() + ? "" + : "," + + otherOptions.entrySet().stream() + .map( + e -> + String.format( + "'%s'='%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(","))); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); + + // first step: check the snapshot data + if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) { + checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables); + } + + // second step: check the binlog data + checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables); + + sleepMs(3000); + tableResult.getJobClient().get().cancel().get(); + } + + private void checkSnapshotData( + TableResult tableResult, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { + String[] snapshotForSingleTable = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + + List expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); + } + + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + + // trigger failover after some snapshot splits read finished + if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) { + triggerFailover( + failoverType, + jobId, + miniClusterResource.get().getMiniCluster(), + () -> sleepMs(100)); + } + + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + } + + private void checkBinlogData( + TableResult tableResult, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { + waitUntilJobRunning(tableResult); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + + for (String tableId : captureCustomerTables) { + makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + tableId); + } + + // wait for the binlog reading + Thread.sleep(3_000L); + + if (failoverPhase == FailoverPhase.BINLOG) { + triggerFailover( + failoverType, + jobId, + miniClusterResource.get().getMiniCluster(), + () -> sleepMs(200)); + waitUntilJobRunning(tableResult); + } + for (String tableId : captureCustomerTables) { + makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + tableId); + } + + List expectedBinlogData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedBinlogData.addAll(firstPartBinlogEvents); + expectedBinlogData.addAll(secondPartBinlogEvents); + } + sleepMs(3_000); + assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size())); + Assertions.assertThat(hasNextData(iterator)).isFalse(); + } + + private void waitUntilJobRunning(TableResult tableResult) + throws InterruptedException, ExecutionException { + do { + Thread.sleep(5000L); + } while (tableResult.getJobClient().get().getJobStatus().get() != RUNNING); + } + + private boolean hasNextData(final CloseableIterator iterator) + throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + FutureTask future = new FutureTask(iterator::hasNext); + executor.execute(future); + return future.get(3, TimeUnit.SECONDS); + } catch (TimeoutException e) { + return false; + } finally { + executor.shutdown(); + } + } + + /** + * Make some changes on the specified customer table. Changelog in string could be accessed by + * {@link #firstPartBinlogEvents}. + */ + private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId) + throws SQLException { + try { + connection.setAutoCommit(false); + + // make binlog events for the first split + connection.execute( + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "DELETE FROM " + tableId + " where id = 102", + "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); + connection.commit(); + } finally { + connection.close(); + } + } + + /** + * Make some other changes on the specified customer table. Changelog in string could be + * accessed by {@link #secondPartBinlogEvents}. + */ + private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId) + throws SQLException { + try { + connection.setAutoCommit(false); + + // make binlog events for split-1 + connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"); + connection.commit(); + + // make binlog events for the last split + connection.execute( + "INSERT INTO " + + tableId + + " VALUES(2001, 'user_22','Shanghai','123567891234')," + + " (2002, 'user_23','Shanghai','123567891234')," + + "(2003, 'user_24','Shanghai','123567891234')"); + connection.commit(); + } finally { + connection.close(); + } + } + + private void sleepMs(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java new file mode 100644 index 00000000000..51b4a6d69ec --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static java.lang.String.format; + +/** OceanBase CDC source connector integration test. */ +public class OceanBaseSourceITCase extends OceanBaseSourceTestBase { + private static final String DDL_FILE = "oceanbase_ddl_test"; + private static final String DATABASE_NAME = "cdc_s_" + getRandomSuffix(); + + @BeforeAll + public static void beforeClass() throws InterruptedException { + initializeOceanBaseTables(DDL_FILE, DATABASE_NAME, null); + } + + @AfterAll + public static void afterClass() { + dropDatabase(DATABASE_NAME); + } + + @Test + public void testSingleKey() throws Exception { + int parallelism = 1; + String[] captureCustomerTables = new String[] {"orders"}; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + String sourceDDL = + format( + "CREATE TABLE orders_source (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-time-zone' = 'Asia/Shanghai'," + + " 'server-id' = '%s'" + + ")", + getHost(), + getPort(), + USER_NAME, + PASSWORD, + DATABASE_NAME, + getTableNameRegex(captureCustomerTables), + getServerId()); + + // first step: check the snapshot data + String[] snapshotForSingleTable = + new String[] { + "+I[1, 1001, 1, 102, 2022-01-16T00:00]", + "+I[2, 1002, 2, 105, 2022-01-16T00:00]", + "+I[3, 1004, 3, 109, 2022-01-16T00:00]", + "+I[4, 1002, 2, 106, 2022-01-16T00:00]", + "+I[5, 1003, 1, 107, 2022-01-16T00:00]", + }; + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from orders_source"); + CloseableIterator iterator = tableResult.collect(); + List expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); + } + + List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData); + + // second step: check the sink data + tEnv.executeSql( + "CREATE TABLE sink (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + tableResult = tEnv.executeSql("insert into sink select * from orders_source"); + + waitForSinkSize("sink", realSnapshotData.size()); + assertEqualsInAnyOrder( + expectedSnapshotData, TestValuesTableFactory.getRawResultsAsStrings("sink")); + + // third step: check dml events + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + statement.execute("INSERT INTO orders VALUES (6, 1006,1006, 1006,'2022-01-17');"); + statement.execute("INSERT INTO orders VALUES (7,1007, 1007,1007, '2022-01-17');"); + statement.execute("UPDATE orders SET seller_id= 9999, order_id=9999 WHERE id=6;"); + statement.execute("UPDATE orders SET seller_id= 9999, order_id=9999 WHERE id=7;"); + statement.execute("DELETE FROM orders WHERE id=7;"); + } + + String[] expectedBinlog = + new String[] { + "+I[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "-U[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+U[6, 9999, 9999, 1006, 2022-01-17T00:00]", + "-U[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "+U[7, 9999, 9999, 1007, 2022-01-17T00:00]", + "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]" + }; + List expectedBinlogData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedBinlogData.addAll(Arrays.asList(expectedBinlog)); + } + List realBinlog = fetchRows(iterator, expectedBinlog.length); + assertEqualsInOrder(expectedBinlogData, realBinlog); + Thread.sleep(3_000); + tableResult.getJobClient().get().cancel().get(); + } + + @Test + public void testFullTypesDdl() throws Exception { + int parallelism = 1; + String[] captureCustomerTables = new String[] {"oceanbase_full_types"}; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + String sourceDDL = + format( + "CREATE TABLE oceanbase_full_types (\n" + + " `id` INT NOT NULL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT ,\n" + + " small_c SMALLINT,\n" + + " small_un_c INT,\n" + + " medium_c INT,\n" + + " medium_un_c INT,\n" + + " int_c INT ,\n" + + " int_un_c BIGINT,\n" + + " int11_c BIGINT,\n" + + " big_c BIGINT,\n" + + " big_un_c DECIMAL(20, 0),\n" + + " varchar_c VARCHAR(255),\n" + + " char_c CHAR(3),\n" + + " real_c FLOAT,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " big_decimal_c STRING,\n" + + " bit1_c BOOLEAN,\n" + + " tiny1_c BOOLEAN,\n" + + " boolean_c BOOLEAN,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP(0),\n" + + " file_uuid BYTES,\n" + + " bit_c BINARY(8),\n" + + " text_c STRING,\n" + + " tiny_blob_c BYTES,\n" + + " blob_c BYTES,\n" + + " medium_blob_c BYTES,\n" + + " long_blob_c BYTES,\n" + + " year_c INT,\n" + + " enum_c STRING,\n" + + " set_c ARRAY,\n" + + " json_c STRING,\n" + + " point_c STRING,\n" + + " geometry_c STRING,\n" + + " linestring_c STRING,\n" + + " polygon_c STRING,\n" + + " multipoint_c STRING,\n" + + " multiline_c STRING,\n" + + " multipolygon_c STRING,\n" + + " geometrycollection_c STRING,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-time-zone' = 'Asia/Shanghai'," + + " 'server-id' = '%s'" + + ")", + getHost(), + getPort(), + USER_NAME, + PASSWORD, + DATABASE_NAME, + getTableNameRegex(captureCustomerTables), + getServerId()); + tEnv.executeSql(sourceDDL); + + TableResult tableResult = tEnv.executeSql("select * from oceanbase_full_types"); + CloseableIterator iterator = tableResult.collect(); + List realSnapshotData = fetchRows(iterator, 1); + String[] expectedSnapshotData = + new String[] { + "+I[1, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807, 18446744073709551615, Hello World, abc, 123.102, 123.102, 404.4443, 123.4567, 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, [101, 26, -19, 8, 57, 15, 72, -109, -78, -15, 54, -110, 62, 123, 116, 0], [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, b], {\"key1\": \"value1\", \"key2\": \"value2\", \"num1\": 16708304.0, \"num2\": 16708305}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}]" + }; + assertEqualsInAnyOrder(Arrays.asList(expectedSnapshotData), realSnapshotData); + Thread.sleep(3_000); + tableResult.getJobClient().get().cancel().get(); + } + + @Test + public void testMultiKeys() throws Exception { + int parallelism = 1; + String[] captureCustomerTables = new String[] {"orders_with_multi_pks"}; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + String sourceDDL = + format( + "CREATE TABLE orders_with_multi_pks (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id,order_id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-time-zone' = 'Asia/Shanghai'," + + " 'server-id' = '%s'" + + ")", + getHost(), + getPort(), + USER_NAME, + PASSWORD, + DATABASE_NAME, + getTableNameRegex(captureCustomerTables), + getServerId()); + + // first step: check the snapshot data + String[] snapshotForSingleTable = + new String[] { + "+I[1, 1001, 1, 102, 2022-01-16T00:00]", + "+I[2, 1002, 2, 105, 2022-01-16T00:00]", + "+I[3, 1004, 3, 109, 2022-01-16T00:00]", + "+I[4, 1002, 2, 106, 2022-01-16T00:00]", + "+I[5, 1003, 1, 107, 2022-01-16T00:00]", + }; + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from orders_with_multi_pks"); + CloseableIterator iterator = tableResult.collect(); + List expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); + } + + List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData); + + // second step: check the sink data + tEnv.executeSql( + "CREATE TABLE multi_key_sink (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id,order_id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + + tEnv.executeSql("insert into multi_key_sink select * from orders_with_multi_pks"); + + waitForSinkSize("multi_key_sink", realSnapshotData.size()); + assertEqualsInAnyOrder( + expectedSnapshotData, + TestValuesTableFactory.getRawResultsAsStrings("multi_key_sink")); + + // third step: check dml events + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE_NAME); + statement.execute( + "INSERT INTO orders_with_multi_pks VALUES (6, 1006,1006, 1006,'2022-01-17');"); + statement.execute( + "INSERT INTO orders_with_multi_pks VALUES (7,1007, 1007,1007, '2022-01-17');"); + statement.execute( + "UPDATE orders_with_multi_pks SET seller_id= 9999, order_id=9999 WHERE id=6;"); + statement.execute( + "UPDATE orders_with_multi_pks SET seller_id= 9999, order_id=9999 WHERE id=7;"); + statement.execute("DELETE FROM orders_with_multi_pks WHERE id=7;"); + } + + String[] expectedBinlog = + new String[] { + "+I[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "-D[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[6, 9999, 9999, 1006, 2022-01-17T00:00]", + "-D[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "+I[7, 9999, 9999, 1007, 2022-01-17T00:00]", + "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]" + }; + List realBinlog = fetchRows(iterator, expectedBinlog.length); + assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog); + Thread.sleep(3_000); + tableResult.getJobClient().get().cancel().get(); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java new file mode 100644 index 00000000000..4a93da7e1ec --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oceanbase; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; + +import io.debezium.connector.mysql.MySqlConnection; +import org.apache.commons.lang3.StringUtils; +import org.assertj.core.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkState; + +/** Basic class for testing Database OceanBase. */ +@Testcontainers +public abstract class OceanBaseSourceTestBase extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseSourceTestBase.class); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + protected static final Integer INNER_PORT = 2883; + protected static final String USER_NAME = "root@test"; + protected static final String PASSWORD = "123456"; + protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(5); + + public static final Network NETWORK = Network.newNetwork(); + + @SuppressWarnings("resource") + @Container + public static final GenericContainer OB_BINLOG_CONTAINER = + new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test") + .withNetwork(NETWORK) + .withStartupTimeout(WAITING_TIMEOUT) + .withExposedPorts(2881, 2883) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor( + new LogMessageWaitStrategy() + .withRegEx(".*OBBinlog is ready!.*") + .withTimes(1) + .withStartupTimeout(Duration.ofMinutes(6))); + + protected static String getHost() { + return OB_BINLOG_CONTAINER.getHost(); + } + + protected static int getPort() { + return OB_BINLOG_CONTAINER.getMappedPort(INNER_PORT); + } + + protected static String getUserName() { + return USER_NAME; + } + + protected static String getPassword() { + return PASSWORD; + } + + protected static String getJdbcUrl() { + return String.format("jdbc:mysql://%s:%s", getHost(), getPort()); + } + + protected static Connection getJdbcConnection() throws SQLException { + String jdbcUrl = getJdbcUrl(); + LOG.info("jdbcUrl is :" + jdbcUrl); + return DriverManager.getConnection(jdbcUrl, USER_NAME, PASSWORD); + } + + /** initialize database and tables with ${databaseName}.sql for testing. */ + protected static void initializeOceanBaseTables( + String ddlName, String dbName, Function filter) + throws InterruptedException { + final String ddlFile = String.format("ddl/%s.sql", ddlName); + final URL ddlTestFile = OceanBaseSourceTestBase.class.getClassLoader().getResource(ddlFile); + Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " + ddlFile).isNotNull(); + // need to sleep 1s, make sure the jdbc connection can be created + Thread.sleep(1000); + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("drop database if exists " + dbName); + statement.execute("create database if not exists " + dbName); + statement.execute("use " + dbName + ";"); + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .filter(sql -> filter == null || filter.apply(sql)) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected static void dropDatabase(String dbName) { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("drop database if exists " + dbName); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Row row = iter.next(); + rows.add(row.toString()); + } + return rows; + } + + protected String getTableNameRegex(String[] captureCustomerTables) { + checkState(captureCustomerTables.length > 0); + if (captureCustomerTables.length == 1) { + return captureCustomerTables[0]; + } else { + // pattern that matches multiple tables + return format("(%s)", StringUtils.join(captureCustomerTables, "|")); + } + } + + protected String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + 4); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + protected static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + protected static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + protected static void assertEqualsInAnyOrder(List expected, List actual) { + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + protected static void assertEqualsInOrder(List expected, List actual) { + Assertions.assertThat(actual).containsExactlyElementsOf(expected); + } + + protected static String getRandomSuffix() { + String base = UUID.randomUUID().toString().replaceAll("-", ""); + if (base.length() > 10) { + return base.substring(0, 11); + } + return base; + } + + /** The type of failover. */ + protected enum FailoverType { + TM, + JM, + NONE + } + + /** The phase of failover. */ + protected enum FailoverPhase { + SNAPSHOT, + BINLOG, + NEVER + } + + protected static void triggerFailover( + FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) + throws Exception { + switch (type) { + case TM: + restartTaskManager(miniCluster, afterFailAction); + break; + case JM: + triggerJobManagerFailover(jobId, miniCluster, afterFailAction); + break; + case NONE: + break; + default: + throw new IllegalStateException("Unexpected value: " + type); + } + } + + protected static void triggerJobManagerFailover( + JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { + final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + haLeadershipControl.grantJobMasterLeadership(jobId).get(); + } + + protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) + throws Exception { + miniCluster.terminateTaskManager(0).get(); + afterFailAction.run(); + miniCluster.startTaskManager(); + } + + protected MySqlConnection getConnection() { + Map properties = new HashMap<>(); + properties.put("database.hostname", getHost()); + properties.put("database.port", String.valueOf(getPort())); + properties.put("database.user", getUserName()); + properties.put("database.password", getPassword()); + properties.put("database.serverTimezone", ZoneId.of("Asia/Shanghai").toString()); + io.debezium.config.Configuration configuration = + io.debezium.config.Configuration.from(properties); + return DebeziumUtils.createMySqlConnection(configuration, new Properties()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/oceanbase_ddl_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/oceanbase_ddl_test.sql new file mode 100644 index 00000000000..cc5c30b2fda --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/oceanbase_ddl_test.sql @@ -0,0 +1,232 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: oceanbase_ddl_test +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create orders table with single primary key +create table orders ( + id bigint not null auto_increment, + seller_id varchar(30) DEFAULT NULL, + order_id varchar(30) DEFAULT NULL, + buyer_id varchar(30) DEFAULT NULL, + create_time datetime DEFAULT NULL, + primary key(id), + INDEX `i_seller`(`seller_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by key(id) partitions 3; + +-- insert some orders for testing +INSERT INTO orders +VALUES (1, 1001, 1, 102, '2022-01-16'), + (2, 1002, 2, 105, '2022-01-16'), + (3, 1004, 3, 109, '2022-01-16'), + (4, 1002, 2, 106, '2022-01-16'), + (5, 1003, 1, 107, '2022-01-16'); + +-- Create orders with multi primary keys +create table orders_with_multi_pks ( + id bigint not null auto_increment, + seller_id varchar(30) DEFAULT NULL, + order_id varchar(30) NOT NULL, + buyer_id varchar(30) DEFAULT NULL, + create_time datetime DEFAULT NULL, + primary key(id, order_id), + INDEX `g_mi_seller`(`seller_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by key(id, order_id) partitions 3; + +-- insert some orders for testing +INSERT INTO orders_with_multi_pks +VALUES (1, 1001, 1, 102, '2022-01-16'), + (2, 1002, 2, 105, '2022-01-16'), + (3, 1004, 3, 109, '2022-01-16'), + (4, 1002, 2, 106, '2022-01-16'), + (5, 1003, 1, 107, '2022-01-16'); + + +-- create table with full types +CREATE TABLE oceanbase_full_types ( + id INT AUTO_INCREMENT, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED, + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + medium_c MEDIUMINT, + medium_un_c MEDIUMINT UNSIGNED, + int_c INTEGER , + int_un_c INTEGER UNSIGNED, + int11_c INT(11) DEFAULT 0, + big_c BIGINT, + big_un_c BIGINT UNSIGNED, + varchar_c VARCHAR(255) DEFAULT '1', + char_c CHAR(3) DEFAULT '', + real_c REAL, + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + big_decimal_c DECIMAL(65, 1), + bit1_c BIT, + tiny1_c TINYINT(1), + boolean_c BOOLEAN, + date_c DATE, + time_c TIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP, + file_uuid BINARY(16), + bit_c BIT(64), + text_c TEXT, + tiny_blob_c TINYBLOB, + blob_c BLOB, + medium_blob_c MEDIUMBLOB, + long_blob_c LONGBLOB, + year_c YEAR, + enum_c enum('red', 'white') default 'red', + set_c SET('a', 'b'), + json_c JSON, + point_c POINT, + geometry_c GEOMETRY, + linestring_c LINESTRING, + polygon_c POLYGON, + multipoint_c MULTIPOINT, + multiline_c MULTILINESTRING, + multipolygon_c MULTIPOLYGON, + geometrycollection_c GEOMETRYCOLLECTION, + PRIMARY KEY (id), + INDEX `g_mit_seller`(`int_c`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by Hash(id) partitions 3; + +INSERT INTO oceanbase_full_types VALUES ( + DEFAULT, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807, + 18446744073709551615, + 'Hello World', 'abc', 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, 0, 1, true, + '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', + unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')), b'0000010000000100000001000000010000000100000001000000010000000100', + 'text',UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)), 2021, + 'red', 'a,b,a', '{"key1": "value1", "key2": "value2", "num1": 1.6708304E7, "num2": 16708305}', + ST_GeomFromText('POINT(1 1)'), + ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'), + ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'), + ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'), + ST_GeomFromText('MULTIPOINT((1 1),(2 2))'), + ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'), + ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'), + ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))') +); + +-- Create orders_sink for testing the sink of flink-jdbc-connector +create table orders_sink ( + id bigint not null auto_increment, + seller_id varchar(30) DEFAULT NULL, + order_id varchar(30) DEFAULT NULL, + buyer_id varchar(30) DEFAULT NULL, + create_time datetime DEFAULT NULL, + primary key(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 partition by hash(id) partitions 3; + + +-- Create and populate our users using a single insert with many rows +CREATE TABLE customers ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- table has same name prefix with 'customers.*' +CREATE TABLE customers_1 ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_1 +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + + +CREATE TABLE customers_no_pk ( + id INTEGER, + name VARCHAR(255) DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_no_pk +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java index 5e1e979891e..9474a225ff8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -40,8 +39,6 @@ import java.util.stream.Stream; /** Test supporting different column charsets for OceanBase. */ -@Disabled( - "Temporarily disabled for GitHub CI due to unavailability of OceanBase Binlog Service docker image. These tests are currently only supported for local execution.") public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase { private static final String DDL_FILE = "charset_test"; @@ -183,7 +180,7 @@ public void testCharset(String testName, String[] snapshotExpected, String[] bin + ")", testName, getHost(), - PORT, + getPort(), USER_NAME, PASSWORD, DATABASE_NAME, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java index aaad93e1ca9..f722813e78d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -35,7 +34,6 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -45,28 +43,29 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.lang.String.format; import static org.apache.flink.api.common.JobStatus.RUNNING; /** failover IT tests for oceanbase. */ -@Disabled( - "Temporarily disabled for GitHub CI due to unavailability of OceanBase Binlog Service docker image. These tests are currently only supported for local execution.") @Timeout(value = 180, unit = TimeUnit.SECONDS) public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase { private static final String DEFAULT_SCAN_STARTUP_MODE = "initial"; private static final String DDL_FILE = "oceanbase_ddl_test"; - private static final String DEFAULT_TEST_DATABASE = "customer_" + getRandomSuffix(); protected static final int DEFAULT_PARALLELISM = 4; + private String testDatabase = "customer_" + getRandomSuffix(); private final List firstPartBinlogEvents = Arrays.asList( @@ -87,9 +86,9 @@ public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase { public static Stream parameters() { return Stream.of( - Arguments.of("customers", null), - Arguments.of("customers", "id"), - Arguments.of("customers_no_pk", "id")); + Arguments.of("customers", null, "false"), + Arguments.of("customers", "id", "true"), + Arguments.of("customers_no_pk", "id", "true")); } @RegisterExtension @@ -105,47 +104,53 @@ public static Stream parameters() { @BeforeEach public void setup() throws InterruptedException { + testDatabase = "customer_" + getRandomSuffix(); initializeOceanBaseTables( DDL_FILE, - DEFAULT_TEST_DATABASE, + testDatabase, s -> !StringUtils.isNullOrWhitespaceOnly(s) && (s.contains("customers"))); } @AfterEach public void clean() { - dropDatabase(DEFAULT_TEST_DATABASE); + dropDatabase(testDatabase); } // Failover tests @ParameterizedTest @MethodSource("parameters") - @Timeout(value = 120, unit = TimeUnit.SECONDS) - public void testTaskManagerFailoverInSnapshotPhase(String tableName, String chunkColumnName) - throws Exception { + public void testTaskManagerFailoverInSnapshotPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( FailoverType.TM, FailoverPhase.SNAPSHOT, new String[] {tableName, "customers_1"}, tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testTaskManagerFailoverInBinlogPhase(String tableName, String chunkColumnName) - throws Exception { + public void testTaskManagerFailoverInBinlogPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( FailoverType.TM, FailoverPhase.BINLOG, new String[] {tableName, "customers_1"}, tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testTaskManagerFailoverFromLatestOffset(String tableName, String chunkColumnName) - throws Exception { + public void testTaskManagerFailoverFromLatestOffset( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( DEFAULT_PARALLELISM, "latest-offset", @@ -154,37 +159,46 @@ public void testTaskManagerFailoverFromLatestOffset(String tableName, String chu new String[] {tableName, "customers_1"}, RestartStrategies.fixedDelayRestart(1, 0), tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testJobManagerFailoverInSnapshotPhase(String tableName, String chunkColumnName) - throws Exception { + public void testJobManagerFailoverInSnapshotPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {tableName, "customers_1"}, tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testJobManagerFailoverInBinlogPhase(String tableName, String chunkColumnName) - throws Exception { + public void testJobManagerFailoverInBinlogPhase( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( FailoverType.JM, FailoverPhase.BINLOG, new String[] {tableName, "customers_1"}, tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testJobManagerFailoverFromLatestOffset(String tableName, String chunkColumnName) - throws Exception { + public void testJobManagerFailoverFromLatestOffset( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( DEFAULT_PARALLELISM, "latest-offset", @@ -193,33 +207,42 @@ public void testJobManagerFailoverFromLatestOffset(String tableName, String chun new String[] {tableName, "customers_1"}, RestartStrategies.fixedDelayRestart(1, 0), tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testTaskManagerFailoverSingleParallelism(String tableName, String chunkColumnName) - throws Exception { + public void testTaskManagerFailoverSingleParallelism( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( 1, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[] {tableName}, tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } @ParameterizedTest @MethodSource("parameters") - public void testJobManagerFailoverSingleParallelism(String tableName, String chunkColumnName) - throws Exception { + public void testJobManagerFailoverSingleParallelism( + String tableName, String chunkColumnName, String assignEndingFirst) throws Exception { testMySqlParallelSource( 1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {tableName}, tableName, - chunkColumnName); + chunkColumnName, + Collections.singletonMap( + "scan.incremental.snapshot.unbounded-chunk-first.enabled", + assignEndingFirst)); } private void testMySqlParallelSource( @@ -227,7 +250,8 @@ private void testMySqlParallelSource( FailoverPhase failoverPhase, String[] captureCustomerTables, String tableName, - String chunkColumnName) + String chunkColumnName, + Map otherOptions) throws Exception { testMySqlParallelSource( DEFAULT_PARALLELISM, @@ -235,7 +259,8 @@ private void testMySqlParallelSource( failoverPhase, captureCustomerTables, tableName, - chunkColumnName); + chunkColumnName, + otherOptions); } private void testMySqlParallelSource( @@ -244,7 +269,8 @@ private void testMySqlParallelSource( FailoverPhase failoverPhase, String[] captureCustomerTables, String tableName, - String chunkColumnName) + String chunkColumnName, + Map otherOptions) throws Exception { testMySqlParallelSource( parallelism, @@ -254,7 +280,8 @@ private void testMySqlParallelSource( captureCustomerTables, RestartStrategies.fixedDelayRestart(1, 0), tableName, - chunkColumnName); + chunkColumnName, + otherOptions); } private void testMySqlParallelSource( @@ -265,7 +292,8 @@ private void testMySqlParallelSource( String[] captureCustomerTables, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, String tableName, - String chunkColumnName) + String chunkColumnName, + Map otherOptions) throws Exception { testMySqlParallelSource( parallelism, @@ -276,7 +304,8 @@ private void testMySqlParallelSource( restartStrategyConfiguration, false, tableName, - chunkColumnName); + chunkColumnName, + otherOptions); } private void testMySqlParallelSource( @@ -288,11 +317,10 @@ private void testMySqlParallelSource( RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, boolean skipSnapshotBackfill, String tableName, - String chunkColumnName) + String chunkColumnName, + Map otherOptions) throws Exception { - captureCustomerTables = new String[] {tableName}; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); env.setParallelism(parallelism); @@ -309,7 +337,7 @@ private void testMySqlParallelSource( ? "" : ", primary key (id) not enforced") + ") WITH (" - + " 'connector' = 'oceanbase-cdc'," + + " 'connector' = 'mysql-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," + " 'hostname' = '%s'," + " 'port' = '%s'," @@ -323,12 +351,13 @@ private void testMySqlParallelSource( + " 'server-time-zone' = 'Asia/Shanghai'," + " 'server-id' = '%s'" + " %s" + + " %s" + ")", getHost(), getPort(), getUserName(), getPassword(), - DEFAULT_TEST_DATABASE, + testDatabase, getTableNameRegex(captureCustomerTables), scanStartupMode, skipSnapshotBackfill, @@ -337,7 +366,17 @@ private void testMySqlParallelSource( ? "" : String.format( ", 'scan.incremental.snapshot.chunk.key-column' = '%s'", - chunkColumnName)); + chunkColumnName), + otherOptions.isEmpty() + ? "" + : "," + + otherOptions.entrySet().stream() + .map( + e -> + String.format( + "'%s'='%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(","))); tEnv.executeSql(sourceDDL); TableResult tableResult = tEnv.executeSql("select * from customers"); @@ -349,7 +388,7 @@ private void testMySqlParallelSource( // second step: check the binlog data checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables); - // sleepMs(3000); + sleepMs(3000); tableResult.getJobClient().get().cancel().get(); } @@ -416,7 +455,7 @@ private void checkBinlogData( JobID jobId = tableResult.getJobClient().get().getJobID(); for (String tableId : captureCustomerTables) { - makeFirstPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId); + makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + tableId); } // wait for the binlog reading @@ -431,7 +470,7 @@ private void checkBinlogData( waitUntilJobRunning(tableResult); } for (String tableId : captureCustomerTables) { - makeSecondPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId); + makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + tableId); } List expectedBinlogData = new ArrayList<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java index 75b6726ff09..702a9da4dbc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.sql.Connection; @@ -39,8 +38,6 @@ import static java.lang.String.format; /** OceanBase CDC source connector integration test. */ -@Disabled( - "Temporarily disabled for GitHub CI due to unavailability of OceanBase Binlog Service docker image. These tests are currently only supported for local execution.") public class OceanBaseSourceITCase extends OceanBaseSourceTestBase { private static final String DDL_FILE = "oceanbase_ddl_test"; private static final String DATABASE_NAME = "cdc_s_" + getRandomSuffix(); @@ -89,7 +86,7 @@ public void testSingleKey() throws Exception { + " 'server-id' = '%s'" + ")", getHost(), - PORT, + getPort(), USER_NAME, PASSWORD, DATABASE_NAME, @@ -241,7 +238,7 @@ public void testFullTypesDdl() throws Exception { + " 'server-id' = '%s'" + ")", getHost(), - PORT, + getPort(), USER_NAME, PASSWORD, DATABASE_NAME, @@ -295,7 +292,7 @@ public void testMultiKeys() throws Exception { + " 'server-id' = '%s'" + ")", getHost(), - PORT, + getPort(), USER_NAME, PASSWORD, DATABASE_NAME, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java index 0ea3cb9662d..703e0a5234d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java @@ -30,6 +30,12 @@ import org.assertj.core.api.Assertions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import java.net.URL; import java.nio.file.Files; @@ -38,6 +44,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; @@ -56,22 +63,40 @@ import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkState; -/** Basic class for testing Database OceanBase which supported the mysql protocol. */ +/** Basic class for testing Database OceanBase. */ +@Testcontainers public abstract class OceanBaseSourceTestBase extends AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseSourceTestBase.class); private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); - protected static final Integer PORT = 3306; - protected static final String USER_NAME = System.getenv("OCEANBASE_USERNAME"); - protected static final String PASSWORD = System.getenv("OCEANBASE_PASSWORD"); - protected static final String HOSTNAME = System.getenv("OCEANBASE_HOSTNAME"); + + protected static final Integer INNER_PORT = 2883; + protected static final String USER_NAME = "root@test"; + protected static final String PASSWORD = "123456"; + protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(5); + + public static final Network NETWORK = Network.newNetwork(); + + @SuppressWarnings("resource") + @Container + public static final GenericContainer OB_BINLOG_CONTAINER = + new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test") + .withNetwork(NETWORK) + .withStartupTimeout(WAITING_TIMEOUT) + .withExposedPorts(2881, 2883) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor( + new LogMessageWaitStrategy() + .withRegEx(".*OBBinlog is ready!.*") + .withTimes(1) + .withStartupTimeout(Duration.ofMinutes(6))); protected static String getHost() { - return HOSTNAME; + return OB_BINLOG_CONTAINER.getHost(); } - protected static Integer getPort() { - return PORT; + protected static int getPort() { + return OB_BINLOG_CONTAINER.getMappedPort(INNER_PORT); } protected static String getUserName() { @@ -83,7 +108,7 @@ protected static String getPassword() { } protected static String getJdbcUrl() { - return String.format("jdbc:mysql://%s:%s", HOSTNAME, PORT); + return String.format("jdbc:mysql://%s:%s", getHost(), getPort()); } protected static Connection getJdbcConnection() throws SQLException { @@ -253,7 +278,7 @@ protected MySqlConnection getConnection() { properties.put("database.port", String.valueOf(getPort())); properties.put("database.user", getUserName()); properties.put("database.password", getPassword()); - properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + properties.put("database.serverTimezone", ZoneId.of("Asia/Shanghai").toString()); io.debezium.config.Configuration configuration = io.debezium.config.Configuration.from(properties); return DebeziumUtils.createMySqlConnection(configuration, new Properties()); diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml index f97841016d1..57c7d385be9 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml @@ -340,6 +340,16 @@ limitations under the License. + + org.apache.flink + flink-sql-connector-oceanbase-cdc + ${project.version} + oceanbase-cdc-connector.jar + jar + ${project.build.directory}/dependencies + + + com.ibm.db2.jcc db2jcc diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java new file mode 100644 index 00000000000..bb0cb26f237 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.tests; + +import org.apache.flink.cdc.common.test.utils.JdbcProxy; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished; + +/** End-to-end tests for oceanbase-cdc connector uber jar. */ +@Testcontainers +class OceanBaseE2eITCase extends FlinkContainerTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseE2eITCase.class); + + private static final Path oceanbaseCdcJar = + TestUtils.getResource("oceanbase-cdc-connector.jar"); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + private static final String[] CREATE_DATABASE_DDL = + new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"}; + private static final Path jdbcDriver = TestUtils.getResource("mysql-driver.jar"); + private static final String DATABASE_NAME = "oceanbase_inventory"; + + private static final int OB_PROXY_PORT = 2883; + protected static final String DEFAULT_USERNAME = "root@test"; + protected static final String DEFAULT_PASSWORD = "123456"; + protected static final String INTER_CONTAINER_OCEANBASE_ALIAS = "oceanbase"; + + @Container + @SuppressWarnings("resource") + public static final GenericContainer OB_BINLOG_CONTAINER = + new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_OCEANBASE_ALIAS) + .withStartupTimeout(Duration.ofMinutes(5)) + .withExposedPorts(2881, 2883) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor( + new LogMessageWaitStrategy() + .withRegEx(".*OBBinlog is ready!.*") + .withTimes(1) + .withStartupTimeout(Duration.ofMinutes(6))); + + @BeforeEach + public void before() { + super.before(); + createAndInitializeOBTable("oceanbase_inventory"); + } + + @AfterEach + public void after() { + super.after(); + } + + @Test + void testOBBinlogCDC() throws Exception { + List sqlLines = + Arrays.asList( + "SET 'execution.checkpointing.interval' = '3s';", + "SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';", + "CREATE TABLE products_source (", + " `id` INT NOT NULL,", + " name STRING,", + " description STRING,", + " weight DECIMAL(10,3),", + " enum_c STRING,", + " json_c STRING,", + " point_c STRING,", + " primary key (`id`) not enforced", + ") WITH (", + " 'connector' = 'oceanbase-cdc',", + " 'hostname' = '" + INTER_CONTAINER_OCEANBASE_ALIAS + "',", + " 'port' = '2883',", + " 'username' = '" + DEFAULT_USERNAME + "',", + " 'password' = '" + DEFAULT_PASSWORD + "',", + " 'database-name' = '" + DATABASE_NAME + "',", + " 'table-name' = 'products_source',", + " 'server-time-zone' = 'Asia/Shanghai',", + " 'server-id' = '5800-5900',", + " 'scan.incremental.snapshot.chunk.size' = '4',", + " 'scan.incremental.close-idle-reader.enabled' = '" + + supportCheckpointsAfterTasksFinished() + + "'", + ");", + "CREATE TABLE products_sink (", + " `id` INT NOT NULL,", + " name STRING,", + " description STRING,", + " weight DECIMAL(10,3),", + " enum_c STRING,", + " json_c STRING,", + " point_c STRING,", + " primary key (`id`) not enforced", + ") WITH (", + " 'connector' = 'jdbc',", + String.format( + " 'url' = 'jdbc:mysql://%s:2883/%s',", + INTER_CONTAINER_OCEANBASE_ALIAS, DATABASE_NAME), + " 'table-name' = 'products_sink',", + " 'username' = '" + DEFAULT_USERNAME + "',", + " 'password' = '" + DEFAULT_PASSWORD + "'", + ");", + "INSERT INTO products_sink", + "SELECT * FROM products_source;"); + + submitSQLJob(sqlLines, oceanbaseCdcJar, jdbcJar, jdbcDriver); + waitUntilJobRunning(Duration.ofSeconds(30)); + + // generate binlogs + String jdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + OB_BINLOG_CONTAINER.getHost(), + OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT), + DATABASE_NAME); + try (Connection conn = + DriverManager.getConnection(jdbcUrl, DEFAULT_USERNAME, DEFAULT_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute( + "UPDATE products_source SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products_source SET weight='5.1' WHERE id=107;"); + stat.execute( + "INSERT INTO products_source VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 + stat.execute( + "INSERT INTO products_source VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null);"); + stat.execute( + "UPDATE products_source SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products_source SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products_source WHERE id=111;"); + // add schema change event in the last. + stat.execute("CREATE TABLE new_table (id int, age int);"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + // assert final results + JdbcProxy proxy = + new JdbcProxy(jdbcUrl, DEFAULT_USERNAME, DEFAULT_PASSWORD, MYSQL_DRIVER_CLASS); + List expectResult = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.14,red,{\"key1\": \"value1\"},{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102,car battery,12V car battery,8.1,white,{\"key2\": \"value2\"},{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8,red,{\"key3\": \"value3\"},{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104,hammer,12oz carpenter's hammer,0.75,white,{\"key4\": \"value4\"},{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105,hammer,14oz carpenter's hammer,0.875,red,{\"k1\": \"v1\", \"k2\": \"v2\"},{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106,hammer,18oz carpenter hammer,1.0,null,null,null", + "107,rocks,box of assorted rocks,5.1,null,null,null", + "108,jacket,water resistent black wind breaker,0.1,null,null,null", + "109,spare tire,24 inch spare tire,22.2,null,null,null", + "110,jacket,new water resistent white wind breaker,0.5,null,null,null"); + proxy.checkResultWithTimeout( + expectResult, + "products_sink", + new String[] {"id", "name", "description", "weight", "enum_c", "json_c", "point_c"}, + 300000L); + } + + /** Creates the database and populates it with initialization SQL script. */ + public void createAndInitializeOBTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile); + Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " + ddlFile).isNotNull(); + try { + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Stream.concat( + Arrays.stream(CREATE_DATABASE_DDL), + Files.readAllLines( + Paths.get(ddlTestFile.toURI())) + .stream()) + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .map(sql -> sql.replace("$DBNAME$", DATABASE_NAME)) + .collect(Collectors.joining("\n")) + .split(";")) + .map(x -> x.replace("$$", ";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } + } catch (final Exception e) { + throw new IllegalStateException(e); + } + } + + public Connection getJdbcConnection() throws SQLException { + String url = + "jdbc:mysql://" + + OB_BINLOG_CONTAINER.getHost() + + ":" + + OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT) + + "?useSSL=false"; + return DriverManager.getConnection(url, DEFAULT_USERNAME, DEFAULT_PASSWORD); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql new file mode 100644 index 00000000000..73bb75d7f2a --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql @@ -0,0 +1,51 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products_source ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c enum('red', 'white') default 'red', -- test some complex types as well, + json_c JSON, -- because we use additional dependencies to deserialize complex types. + point_c POINT +); +ALTER TABLE products_source AUTO_INCREMENT = 101; + +INSERT INTO products_source +VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')), + (default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')), + (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')), + (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')), + (default,"hammer","16oz carpenter's hammer",1.0, null, null, null), + (default,"rocks","box of assorted rocks",5.3, null, null, null), + (default,"jacket","water resistent black wind breaker",0.1, null, null, null), + (default,"spare tire","24 inch spare tire",22.2, null, null, null); + +CREATE TABLE products_sink ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c VARCHAR(255), + json_c VARCHAR(255), + point_c VARCHAR(255) +); \ No newline at end of file