diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md b/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md index 912f9674983..6657cbf544a 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/paimon.md @@ -110,7 +110,7 @@ Pipeline 连接器配置项 Hive metastore 的 uri,在 metastore 设置为 hive 的时候需要。 - commit.user + commit.user-prefix optional admin String diff --git a/docs/content/docs/connectors/pipeline-connectors/paimon.md b/docs/content/docs/connectors/pipeline-connectors/paimon.md index bd9aedb9d3d..19efb8501eb 100644 --- a/docs/content/docs/connectors/pipeline-connectors/paimon.md +++ b/docs/content/docs/connectors/pipeline-connectors/paimon.md @@ -110,7 +110,7 @@ Pipeline Connector Options Uri of metastore server. - commit.user + commit.user-prefix optional "admin" String diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java index 5f2712a93f6..bcd834d2f52 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkOptions.java @@ -33,9 +33,10 @@ public class PaimonDataSinkOptions { public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties."; public static final ConfigOption COMMIT_USER = - key("commit.user") + key("commit.user-prefix") .stringType() .defaultValue("admin") + .withFallbackKeys("commit.user") .withDescription("User name for committing data files."); public static final ConfigOption WAREHOUSE = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java index 86622f611c6..ca249c20c9f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java @@ -20,6 +20,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.SupportsWriterState; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -33,30 +37,35 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.sink.CommitMessageSerializer; +import java.util.Collection; +import java.util.UUID; + /** * A {@link Sink} for Paimon. Maintain this package until Paimon has it own sinkV2 implementation. */ -public class PaimonSink implements WithPreCommitTopology { +public class PaimonSink + implements WithPreCommitTopology, + SupportsWriterState { // provided a default commit user. public static final String DEFAULT_COMMIT_USER = "admin"; protected final Options catalogOptions; + /** The commitUser should be restored in state and restore it in writer. */ protected final String commitUser; private final PaimonRecordSerializer serializer; public PaimonSink(Options catalogOptions, PaimonRecordSerializer serializer) { - this.catalogOptions = catalogOptions; - this.serializer = serializer; - commitUser = DEFAULT_COMMIT_USER; + this(catalogOptions, DEFAULT_COMMIT_USER, serializer); } public PaimonSink( Options catalogOptions, String commitUser, PaimonRecordSerializer serializer) { this.catalogOptions = catalogOptions; - this.commitUser = commitUser; + // generate a random commit user to avoid conflict. + this.commitUser = commitUser + UUID.randomUUID(); this.serializer = serializer; } @@ -69,6 +78,22 @@ public PaimonWriter createWriter(InitContext context) { catalogOptions, context.metricGroup(), commitUser, serializer, lastCheckpointId); } + @Override + public StatefulSinkWriter restoreWriter( + WriterInitContext context, Collection paimonWriterStates) { + long lastCheckpointId = + context.getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + Preconditions.checkNotNull(paimonWriterStates); + String storedCommitUser = paimonWriterStates.iterator().next().getCommitUser(); + return new PaimonWriter<>( + catalogOptions, + context.metricGroup(), + storedCommitUser, + serializer, + lastCheckpointId); + } + @Override public Committer createCommitter() { return new PaimonCommitter(catalogOptions, commitUser); @@ -100,4 +125,9 @@ public DataStream> addPreCommitTopolog new PreCommitOperator(catalogOptions, commitUser)) .setParallelism(committables.getParallelism()); } + + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { + return PaimonWriterStateSerializer.INSTANCE; + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index bf660454e69..e5821135d5e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSinkWriter; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.metrics.MetricGroup; @@ -42,6 +43,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,7 +53,8 @@ /** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */ public class PaimonWriter - implements TwoPhaseCommittingSink.PrecommittingSinkWriter { + implements TwoPhaseCommittingSink.PrecommittingSinkWriter, + StatefulSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(PaimonWriter.class); @@ -75,6 +78,8 @@ public class PaimonWriter /** A workaround variable trace the checkpointId in {@link StreamOperator#snapshotState}. */ private long lastCheckpointId; + private final PaimonWriterState stateCache; + public PaimonWriter( Options catalogOptions, MetricGroup metricGroup, @@ -93,6 +98,11 @@ public PaimonWriter( Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); this.serializer = serializer; this.lastCheckpointId = lastCheckpointId; + this.stateCache = new PaimonWriterState(commitUser); + LOG.info( + "Created PaimonWriter with commit user {} and identifier {}", + commitUser, + lastCheckpointId); } @Override @@ -214,4 +224,9 @@ public void close() throws Exception { compactExecutor.shutdownNow(); } } + + @Override + public List snapshotState(long checkpointId) { + return Collections.singletonList(stateCache); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java new file mode 100644 index 00000000000..54468ec14fe --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +/** The state of {@link PaimonWriter}. */ +public class PaimonWriterState { + + public static final int VERSION = 0; + + /** + * The commit user of {@link PaimonWriter}. + * + *

Note: Introduced from version 0. + */ + private final String commitUser; + + private transient byte[] serializedBytesCache; + + public PaimonWriterState(String commitUser) { + this.commitUser = commitUser; + } + + public String getCommitUser() { + return commitUser; + } + + public byte[] getSerializedBytesCache() { + return serializedBytesCache; + } + + public void setSerializedBytesCache(byte[] serializedBytesCache) { + this.serializedBytesCache = serializedBytesCache; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java new file mode 100644 index 00000000000..317d7d65164 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; + +/** A {@link SimpleVersionedSerializer} for {@link PaimonWriterState}. */ +public class PaimonWriterStateSerializer implements SimpleVersionedSerializer { + + public static final PaimonWriterStateSerializer INSTANCE = new PaimonWriterStateSerializer(); + + @Override + public int getVersion() { + return PaimonWriterState.VERSION; + } + + @Override + public byte[] serialize(PaimonWriterState paimonWriterState) throws IOException { + if (paimonWriterState.getSerializedBytesCache() != null) { + return paimonWriterState.getSerializedBytesCache(); + } else { + final DataOutputSerializer out = new DataOutputSerializer(64); + out.writeUTF(paimonWriterState.getCommitUser()); + byte[] serializedBytesCache = out.getCopyOfBuffer(); + paimonWriterState.setSerializedBytesCache(serializedBytesCache); + return serializedBytesCache; + } + } + + @Override + public PaimonWriterState deserialize(int version, byte[] serialized) throws IOException { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + if (version == 0) { + String commitUser = in.readUTF(); + return new PaimonWriterState(commitUser); + } + throw new IOException("Unknown version: " + version); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/MySqlToPaimonMigrationITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/MySqlToPaimonMigrationITCase.java new file mode 100644 index 00000000000..bcedcf7b0e0 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/MySqlToPaimonMigrationITCase.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests.migration; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; +import org.apache.flink.cdc.pipeline.tests.utils.TarballFetcher; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.MountableFile; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * E2e cases for stopping & restarting jobs of `MySQL source to Paimon sink` from previous state. + */ +class MySqlToPaimonMigrationITCase extends PipelineTestEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonMigrationITCase.class); + + private static final Duration PAIMON_TESTCASE_TIMEOUT = Duration.ofMinutes(3); + + protected UniqueDatabase mysqlInventoryDatabase; + private final Function dbNameFormatter = + (s) -> String.format(s, mysqlInventoryDatabase.getDatabaseName()); + + @BeforeEach + public void before() throws Exception { + super.before(); + mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + mysqlInventoryDatabase.createAndInitialize(); + jobManager.copyFileToContainer( + MountableFile.forHostPath( + TestUtils.getResource(getPaimonSQLConnectorResourceName())), + sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName()); + jobManager.copyFileToContainer( + MountableFile.forHostPath(TestUtils.getResource("flink-shade-hadoop.jar")), + sharedVolume.toString() + "/flink-shade-hadoop.jar"); + } + + @AfterEach + public void after() { + super.after(); + if (mysqlInventoryDatabase != null) { + mysqlInventoryDatabase.dropDatabase(); + } + } + + @Test + void testBasicJobSubmitting() throws Exception { + String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID(); + String content = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.products\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: paimon\n" + + " catalog.properties.warehouse: %s\n" + + " catalog.properties.metastore: filesystem\n" + + " catalog.properties.cache-enabled: false\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n", + INTER_CONTAINER_MYSQL_ALIAS, + MySqlContainer.MYSQL_PORT, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + warehouse, + 4); + Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar"); + Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar"); + JobID jobID = submitPipelineJob(content, paimonCdcConnector, hadoopJar); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 16oz carpenter's hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.3, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null")); + LOG.info("Snapshot phase successfully finished."); + + waitUntilJobFinished(Duration.ofSeconds(30)); + LOG.info("Job gracefully stopped."); + } + + @ParameterizedTest(name = "{0} -> SNAPSHOT") + @EnumSource(names = {"V3_2_1", "V3_3_0", "V3_4_0", "V3_5_0", "SNAPSHOT"}) + void testStartingJobFromSavepoint(TarballFetcher.CdcVersion migrateFromVersion) + throws Exception { + TarballFetcher.fetch(jobManager, migrateFromVersion); + LOG.info("Successfully fetched CDC {}.", migrateFromVersion); + + String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID(); + String content = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.products\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: paimon\n" + + " catalog.properties.warehouse: %s\n" + + " catalog.properties.metastore: filesystem\n" + + " catalog.properties.cache-enabled: false\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n", + INTER_CONTAINER_MYSQL_ALIAS, + MySqlContainer.MYSQL_PORT, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + warehouse, + 4); + Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar"); + Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar"); + JobID jobID = submitPipelineJob(content, paimonCdcConnector, hadoopJar); + Assertions.assertThat(jobID).isNotNull(); + + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 16oz carpenter's hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.3, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null")); + LOG.info("Snapshot stage finished successfully."); + + generateIncrementalEventsPhaseOne(); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null")); + LOG.info("Incremental stage 1 finished successfully."); + + generateIncrementalEventsPhaseTwo(); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, null", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, null", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, null", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null, null", + "110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1", + "111, scooter, Big 2-wheel scooter, 5.18, null, null, null, 1")); + LOG.info("Incremental stage 2 finished successfully."); + + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + + JobID newJobID = + submitPipelineJob(content, savepointPath, true, paimonCdcConnector, hadoopJar); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + + generateIncrementalEventsPhaseThree(); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, null", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, null", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, null", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null, null", + "110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1")); + cancelJob(newJobID); + } + + @ParameterizedTest(name = "{0} -> SNAPSHOT") + @EnumSource(names = {"SNAPSHOT"}) + void testStartingJobFromSavepointWithSchemaChange(TarballFetcher.CdcVersion migrateFromVersion) + throws Exception { + TarballFetcher.fetch(jobManager, migrateFromVersion); + runInContainerAsRoot(jobManager, "chmod", "0777", "-R", "/tmp/cdc/"); + + LOG.info("Successfully fetched CDC {}.", migrateFromVersion); + + String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID(); + String content = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: %d\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.products\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: paimon\n" + + " catalog.properties.warehouse: %s\n" + + " catalog.properties.metastore: filesystem\n" + + " catalog.properties.cache-enabled: false\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n", + INTER_CONTAINER_MYSQL_ALIAS, + MySqlContainer.MYSQL_PORT, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + warehouse, + 4); + Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar"); + Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar"); + JobID jobID = submitPipelineJob(migrateFromVersion, content, paimonCdcConnector, hadoopJar); + Assertions.assertThat(jobID).isNotNull(); + LOG.info("Submitted Job ID is {} ", jobID); + + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 16oz carpenter's hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.3, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null")); + LOG.info("Snapshot stage finished successfully."); + + generateIncrementalEventsPhaseOne(); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null")); + LOG.info("Incremental stage 1 finished successfully."); + + String savepointPath = stopJobWithSavepoint(jobID); + LOG.info("Stopped Job {} and created a savepoint at {}.", jobID, savepointPath); + // Modify schema and make some data changes. + generateIncrementalEventsPhaseTwo(); + JobID newJobID = + submitPipelineJob(content, savepointPath, true, paimonCdcConnector, hadoopJar); + LOG.info("Reincarnated Job {} has been submitted successfully.", newJobID); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, null", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, null", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, null", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null, null", + "110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1", + "111, scooter, Big 2-wheel scooter, 5.18, null, null, null, 1")); + LOG.info("Incremental stage 2 finished successfully."); + + generateIncrementalEventsPhaseThree(); + validateSinkResult( + warehouse, + mysqlInventoryDatabase.getDatabaseName(), + "products", + Arrays.asList( + "101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, null", + "102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}, null", + "103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}, null", + "104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}, null", + "105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}, null", + "106, hammer, 18oz carpenter hammer, 1.0, null, null, null, null", + "107, rocks, box of assorted rocks, 5.1, null, null, null, null", + "108, jacket, water resistent black wind breaker, 0.1, null, null, null, null", + "109, spare tire, 24 inch spare tire, 22.2, null, null, null, null", + "110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1")); + cancelJob(newJobID); + } + + private void generateIncrementalEventsPhaseOne() { + executeMySqlStatements( + mysqlInventoryDatabase, + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;", + "UPDATE products SET weight='5.1' WHERE id=107;"); + } + + private void generateIncrementalEventsPhaseTwo() { + executeMySqlStatements( + mysqlInventoryDatabase, + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;", + "UPDATE products SET weight='5.1' WHERE id=107;", + "ALTER TABLE products ADD COLUMN new_col INT;", + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);", + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); + } + + private void generateIncrementalEventsPhaseThree() { + executeMySqlStatements( + mysqlInventoryDatabase, + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;", + "UPDATE products SET weight='5.17' WHERE id=111;", + "DELETE FROM products WHERE id=111;"); + } + + private void executeMySqlStatements(UniqueDatabase database, String... statements) { + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), MYSQL.getDatabasePort(), database.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + for (String sql : statements) { + try { + stat.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Failed to execute SQL statement " + sql, e); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed to execute MySQL statements.", e); + } + } + + private void validateSinkResult( + String warehouse, String database, String table, List expected) + throws InterruptedException { + LOG.info("Verifying Paimon {}::{}::{} results...", warehouse, database, table); + long deadline = System.currentTimeMillis() + PAIMON_TESTCASE_TIMEOUT.toMillis(); + List results = Collections.emptyList(); + while (System.currentTimeMillis() < deadline) { + try { + results = fetchPaimonTableRows(warehouse, database, table); + Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); + LOG.info( + "Successfully verified {} records in {} seconds.", + expected.size(), + (System.currentTimeMillis() - deadline + PAIMON_TESTCASE_TIMEOUT.toMillis()) + / 1000); + return; + } catch (Exception e) { + LOG.warn("Validate failed, waiting for the next loop...", e); + } catch (AssertionError ignored) { + // AssertionError contains way too much records and might flood the log output. + LOG.warn( + "Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...", + expected.size(), + results.size()); + } + Thread.sleep(1000L); + } + Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); + } + + private List fetchPaimonTableRows(String warehouse, String database, String table) + throws Exception { + String template = + readLines("docker/peek-paimon.sql").stream() + .filter(line -> !line.startsWith("--")) + .collect(Collectors.joining("\n")); + String sql = String.format(template, warehouse, database, table); + String containerSqlPath = sharedVolume.toString() + "/peek.sql"; + jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath); + + Container.ExecResult result = + jobManager.execInContainer( + "/opt/flink/bin/sql-client.sh", + "--jar", + sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName(), + "--jar", + sharedVolume.toString() + "/flink-shade-hadoop.jar", + "-f", + containerSqlPath); + if (result.getExitCode() != 0) { + throw new RuntimeException( + "Failed to execute peek script. Stdout: " + + result.getStdout() + + "; Stderr: " + + result.getStderr()); + } + + return Arrays.stream(result.getStdout().split("\n")) + .filter(line -> line.startsWith("|")) + .skip(1) + .map(MySqlToPaimonMigrationITCase::extractRow) + .map(row -> String.format("%s", String.join(", ", row))) + .collect(Collectors.toList()); + } + + private static String[] extractRow(String row) { + return Arrays.stream(row.split("\\|")) + .map(String::trim) + .filter(col -> !col.isEmpty()) + .map(col -> col.equals("") ? "null" : col) + .toArray(String[]::new); + } + + protected String getPaimonSQLConnectorResourceName() { + return String.format("paimon-sql-connector-%s.jar", flinkVersion); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java index 6c929ee28a9..2a875d9f774 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/migration/YamlJobMigrationITCase.java @@ -99,7 +99,7 @@ void testBasicJobSubmitting() throws Exception { } @ParameterizedTest(name = "{0} -> SNAPSHOT") - @EnumSource(names = {"V3_2_1", "V3_3_0", "SNAPSHOT"}) + @EnumSource(names = {"V3_2_1", "V3_3_0", "V3_4_0", "V3_5_0", "SNAPSHOT"}) void testStartingJobFromSavepoint(TarballFetcher.CdcVersion migrateFromVersion) throws Exception { TarballFetcher.fetch(jobManager, migrateFromVersion); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index 6e8bac9170e..914278bcbcc 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -142,7 +142,10 @@ private int getParallelism() { "restart-strategy.type: off", // Set off-heap memory explicitly to avoid "java.lang.OutOfMemoryError: Direct // buffer memory" error. - "taskmanager.memory.task.off-heap.size: 128mb"); + "taskmanager.memory.task.off-heap.size: 128mb", + // Fix `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error + // has occurred` error. + "taskmanager.memory.jvm-metaspace.size: 512mb"); public static final String FLINK_PROPERTIES = String.join("\n", EXTERNAL_PROPS); @Nullable protected RestClusterClient restClusterClient; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java index 573a4797c87..1d578891867 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java @@ -115,6 +115,8 @@ public enum CdcVersion { V3_2_0("3.2.0"), V3_2_1("3.2.1"), V3_3_0("3.3.0"), + V3_4_0("3.4.0"), + V3_5_0("3.5.0"), SNAPSHOT("SNAPSHOT"); private final String version;