From 33f7ae2939ab1ba6c5648eafc5a490d61bbc36ec Mon Sep 17 00:00:00 2001 From: jennychen Date: Tue, 2 Sep 2025 22:49:17 +0800 Subject: [PATCH 01/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../pgoutput/PgOutputMessageDecoder.java | 812 ++++++++++++++++++ .../table/PostgreSQLConnectorITCase.java | 51 ++ .../src/test/resources/ddl/cn_column_test.sql | 37 + 3 files changed, 900 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java new file mode 100644 index 00000000000..1f57a673e20 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -0,0 +1,812 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.connection.pgoutput; + +import static java.util.stream.Collectors.toMap; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; +import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.UnchangedToastedReplicationMessageColumn; +import io.debezium.connector.postgresql.connection.AbstractMessageDecoder; +import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn; +import io.debezium.connector.postgresql.connection.LogicalDecodingMessage; +import io.debezium.connector.postgresql.connection.Lsn; +import io.debezium.connector.postgresql.connection.MessageDecoderContext; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.ReplicationMessage.Column; +import io.debezium.connector.postgresql.connection.ReplicationMessage.NoopMessage; +import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation; +import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor; +import io.debezium.connector.postgresql.connection.TransactionMessage; +import io.debezium.connector.postgresql.connection.WalPositionLocator; +import io.debezium.data.Envelope; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.util.HexConverter; +import io.debezium.util.Strings; + +/** + * copied from Debezium 1.9.8-Final with new method readUTF8String + * Decodes messages from the PG logical replication plug-in ("pgoutput"). + * See https://www.postgresql.org/docs/10/protocol-logicalrep-message-formats.html for the protocol specification. + * + * @author Gunnar Morling + * @author Chris Cranford + * + */ +public class PgOutputMessageDecoder extends AbstractMessageDecoder { + + private static final Logger LOGGER = LoggerFactory.getLogger(PgOutputMessageDecoder.class); + private static final Instant PG_EPOCH = LocalDate.of(2000, 1, 1).atStartOfDay().toInstant(ZoneOffset.UTC); + private static final byte SPACE = 32; + + private final MessageDecoderContext decoderContext; + private final PostgresConnection connection; + + private Instant commitTimestamp; + + /** + * Will be null for a non-transactional decoding message + */ + private Long transactionId; + + public enum MessageType { + RELATION, + BEGIN, + COMMIT, + INSERT, + UPDATE, + DELETE, + TYPE, + ORIGIN, + TRUNCATE, + LOGICAL_DECODING_MESSAGE; + + public static MessageType forType(char type) { + switch (type) { + case 'R': + return RELATION; + case 'B': + return BEGIN; + case 'C': + return COMMIT; + case 'I': + return INSERT; + case 'U': + return UPDATE; + case 'D': + return DELETE; + case 'Y': + return TYPE; + case 'O': + return ORIGIN; + case 'T': + return TRUNCATE; + case 'M': + return LOGICAL_DECODING_MESSAGE; + default: + throw new IllegalArgumentException("Unsupported message type: " + type); + } + } + } + + public PgOutputMessageDecoder(MessageDecoderContext decoderContext, PostgresConnection connection) { + this.decoderContext = decoderContext; + this.connection = connection; + } + + @Override + public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Lsn startLsn, WalPositionLocator walPosition) { + // Cache position as we're going to peak at the first byte to determine message type + // We need to reprocess all BEGIN/COMMIT messages regardless. + int position = buffer.position(); + try { + MessageType type = MessageType.forType((char) buffer.get()); + LOGGER.trace("Message Type: {}", type); + final boolean candidateForSkipping = super.shouldMessageBeSkipped(buffer, lastReceivedLsn, startLsn, walPosition); + switch (type) { + case COMMIT: + case BEGIN: + case RELATION: + // BEGIN + // These types should always be processed due to the nature that they provide + // the stream with pertinent per-transaction boundary state we will need to + // always cache as we potentially might reprocess the stream from an earlier + // LSN point. + // + // RELATION + // These messages are always sent with a lastReceivedLSN=0; and we need to + // always accept these to keep per-stream table state cached properly. + LOGGER.trace("{} messages are always reprocessed", type); + return false; + default: + // INSERT/UPDATE/DELETE/TRUNCATE/TYPE/ORIGIN/LOGICAL_DECODING_MESSAGE + // These should be excluded based on the normal behavior, delegating to default method + return candidateForSkipping; + } + } + finally { + // Reset buffer position + buffer.position(position); + } + } + + @Override + public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { + if (LOGGER.isTraceEnabled()) { + if (!buffer.hasArray()) { + throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); + } + final byte[] source = buffer.array(); + // Extend the array by two as we might need to append two chars and set them to space by default + final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2); + final int lastPos = content.length - 1; + content[lastPos - 1] = SPACE; + content[lastPos] = SPACE; + LOGGER.trace("Message arrived from database {}", HexConverter.convertToHexString(content)); + } + + final MessageType messageType = MessageType.forType((char) buffer.get()); + switch (messageType) { + case BEGIN: + handleBeginMessage(buffer, processor); + break; + case COMMIT: + handleCommitMessage(buffer, processor); + break; + case RELATION: + handleRelationMessage(buffer, typeRegistry); + break; + case LOGICAL_DECODING_MESSAGE: + handleLogicalDecodingMessage(buffer, processor); + break; + case INSERT: + decodeInsert(buffer, typeRegistry, processor); + break; + case UPDATE: + decodeUpdate(buffer, typeRegistry, processor); + break; + case DELETE: + decodeDelete(buffer, typeRegistry, processor); + break; + case TRUNCATE: + if (isTruncateEventsIncluded()) { + decodeTruncate(buffer, typeRegistry, processor); + } + else { + LOGGER.trace("Message Type {} skipped, not processed.", messageType); + } + break; + default: + LOGGER.trace("Message Type {} skipped, not processed.", messageType); + break; + } + } + + @Override + public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function hasMinimumServerVersion) { + builder = builder.withSlotOption("proto_version", 1) + .withSlotOption("publication_names", decoderContext.getConfig().publicationName()); + + // DBZ-4374 Use enum once the driver got updated + if (hasMinimumServerVersion.apply(140000)) { + builder = builder.withSlotOption("messages", true); + } + + return builder; + } + + @Override + public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function hasMinimumServerVersion) { + return builder; + } + + private boolean isTruncateEventsIncluded() { + return !decoderContext.getConfig().getSkippedOperations().contains(Envelope.Operation.TRUNCATE); + } + + /** + * Callback handler for the 'B' begin replication message. + * + * @param buffer The replication stream buffer + * @param processor The replication message processor + */ + private void handleBeginMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + final Lsn lsn = Lsn.valueOf(buffer.getLong()); // LSN + this.commitTimestamp = PG_EPOCH.plus(buffer.getLong(), ChronoUnit.MICROS); + this.transactionId = Integer.toUnsignedLong(buffer.getInt()); + LOGGER.trace("Event: {}", MessageType.BEGIN); + LOGGER.trace("Final LSN of transaction: {}", lsn); + LOGGER.trace("Commit timestamp of transaction: {}", commitTimestamp); + LOGGER.trace("XID of transaction: {}", transactionId); + processor.process(new TransactionMessage(Operation.BEGIN, transactionId, commitTimestamp)); + } + + /** + * Callback handler for the 'C' commit replication message. + * + * @param buffer The replication stream buffer + * @param processor The replication message processor + */ + private void handleCommitMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + int flags = buffer.get(); // flags, currently unused + final Lsn lsn = Lsn.valueOf(buffer.getLong()); // LSN of the commit + final Lsn endLsn = Lsn.valueOf(buffer.getLong()); // End LSN of the transaction + Instant commitTimestamp = PG_EPOCH.plus(buffer.getLong(), ChronoUnit.MICROS); + LOGGER.trace("Event: {}", MessageType.COMMIT); + LOGGER.trace("Flags: {} (currently unused and most likely 0)", flags); + LOGGER.trace("Commit LSN: {}", lsn); + LOGGER.trace("End LSN of transaction: {}", endLsn); + LOGGER.trace("Commit timestamp of transaction: {}", commitTimestamp); + processor.process(new TransactionMessage(Operation.COMMIT, transactionId, commitTimestamp)); + } + + /** + * Callback handler for the 'R' relation replication message. + * + * @param buffer The replication stream buffer + * @param typeRegistry The postgres type registry + */ + private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) throws SQLException { + int relationId = buffer.getInt(); + String schemaName = readString(buffer); + String tableName = readString(buffer); + int replicaIdentityId = buffer.get(); + short columnCount = buffer.getShort(); + + LOGGER.trace("Event: {}, RelationId: {}, Replica Identity: {}, Columns: {}", MessageType.RELATION, relationId, replicaIdentityId, columnCount); + LOGGER.trace("Schema: '{}', Table: '{}'", schemaName, tableName); + + // Perform several out-of-bands database metadata queries + Map> columnDefaults; + Map columnOptionality; + List primaryKeyColumns; + + final DatabaseMetaData databaseMetadata = connection.connection().getMetaData(); + final TableId tableId = new TableId(null, schemaName, tableName); + + final List readColumns = getTableColumnsFromDatabase(connection, databaseMetadata, tableId); + columnDefaults = readColumns.stream() + .filter(io.debezium.relational.Column::hasDefaultValue) + .collect(toMap(io.debezium.relational.Column::name, io.debezium.relational.Column::defaultValueExpression)); + + columnOptionality = readColumns.stream().collect(toMap(io.debezium.relational.Column::name, io.debezium.relational.Column::isOptional)); + primaryKeyColumns = connection.readPrimaryKeyNames(databaseMetadata, tableId); + if (primaryKeyColumns == null || primaryKeyColumns.isEmpty()) { + LOGGER.warn("Primary keys are not defined for table '{}', defaulting to unique indices", tableName); + primaryKeyColumns = connection.readTableUniqueIndices(databaseMetadata, tableId); + } + + List columns = new ArrayList<>(); + Set columnNames = new HashSet<>(); + for (short i = 0; i < columnCount; ++i) { + byte flags = buffer.get(); + String columnName = Strings.unquoteIdentifierPart(readUTF8String(buffer)); + int columnType = buffer.getInt(); + int attypmod = buffer.getInt(); + + final PostgresType postgresType = typeRegistry.get(columnType); + boolean key = isColumnInPrimaryKey(schemaName, tableName, columnName, primaryKeyColumns); + + Boolean optional = columnOptionality.get(columnName); + if (optional == null) { + LOGGER.warn("Column '{}' optionality could not be determined, defaulting to true", columnName); + optional = true; + } + + final boolean hasDefault = columnDefaults.containsKey(columnName); + final String defaultValueExpression = columnDefaults.getOrDefault(columnName, Optional.empty()).orElse(null); + + columns.add(new ColumnMetaData(columnName, postgresType, key, optional, hasDefault, defaultValueExpression, attypmod)); + columnNames.add(columnName); + } + + // Remove any PKs that do not exist as part of this this relation message. This can occur when issuing + // multiple schema changes in sequence since the lookup of primary keys is an out-of-band procedure, without + // any logical linkage or context to the point in time the relation message was emitted. + // + // Example DDL: + // ALTER TABLE changepk.test_table DROP COLUMN pk2; -- <- relation message #1 + // ALTER TABLE changepk.test_table ADD COLUMN pk3 SERIAL; -- <- relation message #2 + // ALTER TABLE changepk.test_table ADD PRIMARY KEY(newpk,pk3); -- <- relation message #3 + // + // Consider the above schema changes. There's a possible temporal ordering where the messages arrive + // in the replication slot data stream at time `t0`, `t1`, and `t2`. It then takes until `t10` for _this_ method + // to start processing message #1. At `t10` invoking `connection.readPrimaryKeyNames()` returns the new + // primary key column, 'pk3', defined by message #3. We must remove this primary key column that came + // "from the future" (with temporal respect to the current relate message #1) as a best effort attempt + // to reflect the actual primary key state at time `t0`. + primaryKeyColumns.retainAll(columnNames); + + Table table = resolveRelationFromMetadata(new PgOutputRelationMetaData(relationId, schemaName, tableName, columns, primaryKeyColumns)); + decoderContext.getSchema().applySchemaChangesForTable(relationId, table); + } + + private List getTableColumnsFromDatabase(PostgresConnection connection, DatabaseMetaData databaseMetadata, TableId tableId) + throws SQLException { + List readColumns = new ArrayList<>(); + try { + try (ResultSet columnMetadata = databaseMetadata.getColumns(null, tableId.schema(), tableId.table(), null)) { + while (columnMetadata.next()) { + connection.readColumnForDecoder(columnMetadata, tableId, decoderContext.getConfig().getColumnFilter()) + .ifPresent(readColumns::add); + } + } + } + catch (SQLException e) { + LOGGER.error("Failed to read column metadata for '{}.{}'", tableId.schema(), tableId.table()); + throw e; + } + + return readColumns; + } + + private boolean isColumnInPrimaryKey(String schemaName, String tableName, String columnName, List primaryKeyColumns) { + // todo (DBZ-766) - Discuss this logic with team as there may be a better way to handle this + // Personally I think its sufficient enough to resolve the PK based on the out-of-bands call + // and should any test fail due to this it should be rewritten or excluded from the pgoutput + // scope. + // + // In RecordsStreamProducerIT#shouldReceiveChangesForInsertsIndependentOfReplicaIdentity, we have + // a situation where the table is replica identity full, the primary key is dropped but the replica + // identity is kept and later the replica identity is changed to default. In order to support this + // use case, the following abides by these rules: + // + if (!primaryKeyColumns.isEmpty() && primaryKeyColumns.contains(columnName)) { + return true; + } + else if (primaryKeyColumns.isEmpty()) { + // The table's metadata was either not fetched or table no longer has a primary key + // Lets attempt to use the known schema primary key configuration as a fallback + Table existingTable = decoderContext.getSchema().tableFor(new TableId(null, schemaName, tableName)); + if (existingTable != null && existingTable.primaryKeyColumnNames().contains(columnName)) { + return true; + } + } + return false; + } + + /** + * Callback handler for the 'I' insert replication stream message. + * + * @param buffer The replication stream buffer + * @param typeRegistry The postgres type registry + * @param processor The replication message processor + */ + private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + int relationId = buffer.getInt(); + char tupleType = (char) buffer.get(); // Always 'N" for inserts + + LOGGER.trace("Event: {}, Relation Id: {}, Tuple Type: {}", MessageType.INSERT, relationId, tupleType); + + Optional resolvedTable = resolveRelation(relationId); + + // non-captured table + if (!resolvedTable.isPresent()) { + processor.process(new NoopMessage(transactionId, commitTimestamp)); + } + else { + Table table = resolvedTable.get(); + List columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); + processor.process(new PgOutputReplicationMessage( + Operation.INSERT, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + null, + columns)); + } + } + + /** + * Callback handler for the 'U' update replication stream message. + * + * @param buffer The replication stream buffer + * @param typeRegistry The postgres type registry + * @param processor The replication message processor + */ + private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + int relationId = buffer.getInt(); + + LOGGER.trace("Event: {}, RelationId: {}", MessageType.UPDATE, relationId); + + Optional
resolvedTable = resolveRelation(relationId); + + // non-captured table + if (!resolvedTable.isPresent()) { + processor.process(new NoopMessage(transactionId, commitTimestamp)); + } + else { + Table table = resolvedTable.get(); + + // When reading the tuple-type, we could get 3 different values, 'O', 'K', or 'N'. + // 'O' (Optional) - States the following tuple-data is the key, only for replica identity index configs. + // 'K' (Optional) - States the following tuple-data is the old tuple, only for replica identity full configs. + // + // 'N' (Not-Optional) - States the following tuple-data is the new tuple. + // This is always present. + List oldColumns = null; + char tupleType = (char) buffer.get(); + if ('O' == tupleType || 'K' == tupleType) { + oldColumns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); + // Read the 'N' tuple type + // This is necessary so the stream position is accurate for resolving the column tuple data + tupleType = (char) buffer.get(); + } + + List columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); + processor.process(new PgOutputReplicationMessage( + Operation.UPDATE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + oldColumns, + columns)); + } + } + + /** + * Callback handler for the 'D' delete replication stream message. + * + * @param buffer The replication stream buffer + * @param typeRegistry The postgres type registry + * @param processor The replication message processor + */ + private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + int relationId = buffer.getInt(); + + char tupleType = (char) buffer.get(); + + LOGGER.trace("Event: {}, RelationId: {}, Tuple Type: {}", MessageType.DELETE, relationId, tupleType); + + Optional
resolvedTable = resolveRelation(relationId); + + // non-captured table + if (!resolvedTable.isPresent()) { + processor.process(new NoopMessage(transactionId, commitTimestamp)); + } + else { + Table table = resolvedTable.get(); + List columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); + processor.process(new PgOutputReplicationMessage( + Operation.DELETE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + columns, + null)); + } + } + + /** + * Callback handler for the 'T' truncate replication stream message. + * + * @param buffer The replication stream buffer + * @param typeRegistry The postgres type registry + * @param processor The replication message processor + */ + private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + // As of PG11, the Truncate message format is as described: + // Byte Message Type (Always 'T') + // Int32 number of relations described by the truncate message + // Int8 flags for truncate; 1=CASCADE, 2=RESTART IDENTITY + // Int32[] Array of number of relation ids + // + // In short this message tells us how many relations are impacted by the truncate + // call, whether its cascaded or not and then all table relation ids involved. + // It seems the protocol guarantees to send the most up-to-date `R` relation + // messages for the tables prior to the `T` truncation message, even if in the + // same session a `R` message was followed by an insert/update/delete message. + + int numberOfRelations = buffer.getInt(); + int optionBits = buffer.get(); + // ignored / unused + List truncateOptions = getTruncateOptions(optionBits); + int[] relationIds = new int[numberOfRelations]; + for (int i = 0; i < numberOfRelations; i++) { + relationIds[i] = buffer.getInt(); + } + + List
tables = new ArrayList<>(); + for (int relationId : relationIds) { + Optional
resolvedTable = resolveRelation(relationId); + resolvedTable.ifPresent(tables::add); + } + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Event: {}, RelationIds: {}, OptionBits: {}", MessageType.TRUNCATE, Arrays.toString(relationIds), optionBits); + } + + int noOfResolvedTables = tables.size(); + for (int i = 0; i < noOfResolvedTables; i++) { + Table table = tables.get(i); + boolean lastTableInTruncate = (i + 1) == noOfResolvedTables; + processor.process(new PgOutputTruncateReplicationMessage( + Operation.TRUNCATE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + lastTableInTruncate)); + } + } + + /** + * Convert truncate option bits to postgres syntax truncate options + * + * @param flag truncate option bits + * @return truncate flags + */ + private List getTruncateOptions(int flag) { + switch (flag) { + case 1: + return Collections.singletonList("CASCADE"); + case 2: + return Collections.singletonList("RESTART IDENTITY"); + case 3: + return Arrays.asList("RESTART IDENTITY", "CASCADE"); + default: + return null; + } + } + + /** + * Callback handler for the 'M' logical decoding message + * + * @param buffer The replication stream buffer + * @param processor The replication message processor + */ + private void handleLogicalDecodingMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + // As of PG14, the MESSAGE message format is as described: + // Byte1 Always 'M' + // Int32 Xid of the transaction (only present for streamed transactions in protocol version 2). + // Int8 flags; Either 0 for no flags or 1 if the logical decoding message is transactional. + // Int64 The LSN of the logical decoding message + // String The prefix of the logical decoding message. + // Int32 Length of the content. + // Byten The content of the logical decoding message. + + boolean isTransactional = buffer.get() == 1; + final Lsn lsn = Lsn.valueOf(buffer.getLong()); + String prefix = readString(buffer); + int contentLength = buffer.getInt(); + byte[] content = new byte[contentLength]; + buffer.get(content); + + // non-transactional messages do not have xids or commitTimestamps + if (!isTransactional) { + transactionId = null; + commitTimestamp = null; + } + + LOGGER.trace("Event: {}", MessageType.LOGICAL_DECODING_MESSAGE); + LOGGER.trace("Commit LSN: {}", lsn); + LOGGER.trace("Commit timestamp of transaction: {}", commitTimestamp); + LOGGER.trace("XID of transaction: {}", transactionId); + LOGGER.trace("Transactional: {}", isTransactional); + LOGGER.trace("Prefix: {}", prefix); + + processor.process(new LogicalDecodingMessage( + Operation.MESSAGE, + commitTimestamp, + transactionId, + isTransactional, + prefix, + content)); + } + + /** + * Resolves a given replication message relation identifier to a {@link Table}. + * + * @param relationId The replication message stream's relation identifier + * @return table resolved from a prior relation message or direct lookup from the schema + * or empty when the table is filtered + */ + private Optional
resolveRelation(int relationId) { + return Optional.ofNullable(decoderContext.getSchema().tableFor(relationId)); + } + + /** + * Constructs a {@link Table} based on the supplied {@link PgOutputRelationMetaData}. + * + * @param metadata The relation metadata collected from previous 'R' replication stream messages + * @return table based on a prior replication relation message + */ + private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) { + List columns = new ArrayList<>(); + for (ColumnMetaData columnMetadata : metadata.getColumns()) { + ColumnEditor editor = io.debezium.relational.Column.editor() + .name(columnMetadata.getColumnName()) + .jdbcType(columnMetadata.getPostgresType().getRootType().getJdbcId()) + .nativeType(columnMetadata.getPostgresType().getRootType().getOid()) + .optional(columnMetadata.isOptional()) + .type(columnMetadata.getPostgresType().getName(), columnMetadata.getTypeName()) + .length(columnMetadata.getLength()) + .scale(columnMetadata.getScale()); + + if (columnMetadata.hasDefaultValue()) { + editor.defaultValueExpression(columnMetadata.getDefaultValueExpression()); + } + + columns.add(editor.create()); + } + + Table table = Table.editor() + .addColumns(columns) + .setPrimaryKeyNames(metadata.getPrimaryKeyNames()) + .tableId(metadata.getTableId()) + .create(); + + LOGGER.trace("Resolved '{}' as '{}'", table.id(), table); + + return table; + } + + /** + * Reads the replication stream up to the next null-terminator byte and returns the contents as a string. + * + * @param buffer The replication stream buffer + * @return string read from the replication stream + */ + private static String readString(ByteBuffer buffer) { + StringBuilder sb = new StringBuilder(); + byte b = 0; + while ((b = buffer.get()) != 0) { + sb.append((char) b); + } + return sb.toString(); + } + + /** + * Reads the replication stream up to the next null-terminator byte and return the contents as a utf-8 string + * @param buffer The replication stream buffer + * @return string read from the replication stream + */ + private static String readUTF8String(ByteBuffer buffer) { + buffer.mark(); + + int start = buffer.position(); + int end = start; + boolean nullFound = false; + + // Iterate from current position up to the limit + while (buffer.hasRemaining()) { + if (buffer.get() == 0) { + nullFound = true; + end = buffer.position() - 1; // The null byte itself + break; + } + end = buffer.position(); // update end if not null yet + } + + int length = end - start; + + buffer.reset(); + + byte[] stringBytes = new byte[length]; + + buffer.get(stringBytes); + // if a null terminator was found, advance the buffer's position past it + if(nullFound) { + buffer.get(); + } + + return new String(stringBytes, StandardCharsets.UTF_8); + } + + /** + * Reads the replication stream where the column stream specifies a length followed by the value. + * + * @param buffer The replication stream buffer + * @return the column value as a string read from the replication stream + */ + private static String readColumnValueAsString(ByteBuffer buffer) { + int length = buffer.getInt(); + byte[] value = new byte[length]; + buffer.get(value, 0, length); + return new String(value, Charset.forName("UTF-8")); + } + + /** + * Resolve the replication stream's tuple data to a list of replication message columns. + * + * @param buffer The replication stream buffer + * @param typeRegistry The database type registry + * @param table The database table + * @return list of replication message columns + */ + private static List resolveColumnsFromStreamTupleData(ByteBuffer buffer, TypeRegistry typeRegistry, Table table) { + // Read number of the columns + short numberOfColumns = buffer.getShort(); + + List columns = new ArrayList<>(numberOfColumns); + for (short i = 0; i < numberOfColumns; ++i) { + + final io.debezium.relational.Column column = table.columns().get(i); + final String columnName = column.name(); + final String typeName = column.typeName(); + final PostgresType columnType = typeRegistry.get(typeName); + final String typeExpression = column.typeExpression(); + final boolean optional = column.isOptional(); + + // Read the sub-message type + // 't' : Value is represented as text + // 'u' : An unchanged TOAST-ed value, actual value is not sent. + // 'n' : Value is null. + char type = (char) buffer.get(); + if (type == 't') { + final String valueStr = readColumnValueAsString(buffer); + columns.add( + new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { + @Override + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { + return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes, + typeRegistry); + } + + @Override + public String toString() { + return columnName + "(" + typeExpression + ")=" + valueStr; + } + }); + } + else if (type == 'u') { + columns.add( + new UnchangedToastedReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { + @Override + public String toString() { + return columnName + "(" + typeExpression + ") - Unchanged toasted column"; + } + }); + } + else if (type == 'n') { + columns.add( + new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, true, true) { + @Override + public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { + return null; + } + }); + } + } + + columns.forEach(c -> LOGGER.trace("Column: {}", c)); + return columns; + } + + @Override + public void close() { + if (connection != null) { + connection.close(); + } + } +} \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 44b59312058..3c213f09dc2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -917,4 +917,55 @@ void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Except tableResult.getJobClient().get().cancel().get(); RowUtils.USE_LEGACY_TO_STRING = true; } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCNColumns(boolean parallelismSnapshot) throws Throwable { + setup(parallelismSnapshot); + initializePostgresTable(POSTGRES_CONTAINER, "cn_column_test"); + String sourceDDL = + String.format( + "CREATE TABLE cn_column_table (" + + " 测试id INTEGER NOT NULL," + + " 测试name STRING" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + // In the snapshot phase of increment snapshot mode, table without + // primary key is not allowed now.Thus, when + // scan.incremental.snapshot.enabled = true, use 'latest-offset' + // startup mode. + + (parallelismSnapshot + ? " 'scan.startup.mode' = 'latest-offset'," + : "") + + " 'decoding.plugin.name' = 'pgoutput', " + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "cn_column_test", + parallelismSnapshot, + getSlotName()); + tEnv.executeSql(sourceDDL); + // async submit job + TableResult tableResult = tEnv.executeSql("SELECT * FROM cn_column_table"); + List expected = new ArrayList<>(); + if (!parallelismSnapshot) { + expected.add("+I[1, testName]"); + } + CloseableIterator iterator = tableResult.collect(); + assertEqualsInAnyOrder(expected, fetchRows(iterator,expected.size())); + } + } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql new file mode 100644 index 00000000000..2c9784b3f14 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql @@ -0,0 +1,37 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: column_type_test +-- ---------------------------------------------------------------------------------------------------------------- +-- Generate a number of tables to cover as many of the PG types as possible +DROP SCHEMA IF EXISTS inventory CASCADE; +CREATE SCHEMA inventory; +-- postgis is installed into public schema +SET search_path TO inventory, public; + + +CREATE TABLE cn_column_test +( + "测试id" INTEGER NOT NULL, + "测试name" VARCHAR NOT NULL + PRIMARY KEY (测试Id) +); + +ALTER TABLE inventory.cn_column_test + REPLICA IDENTITY FULL; + +INSERT INTO inventory.cn_column_test +VALUES (1, 'testName'); \ No newline at end of file From 7789883bf1355d74aaaa2f6161722d5b20ace9fa Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 3 Sep 2025 00:12:02 +0800 Subject: [PATCH 02/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../pgoutput/PgOutputMessageDecoder.java | 479 +++++++++++------- .../table/PostgreSQLConnectorITCase.java | 7 +- 2 files changed, 304 insertions(+), 182 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 1f57a673e20..46be0d6f011 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -5,32 +5,6 @@ */ package io.debezium.connector.postgresql.connection.pgoutput; -import static java.util.stream.Collectors.toMap; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.time.Instant; -import java.time.LocalDate; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; - -import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; import io.debezium.connector.postgresql.PostgresType; import io.debezium.connector.postgresql.TypeRegistry; @@ -53,20 +27,46 @@ import io.debezium.relational.TableId; import io.debezium.util.HexConverter; import io.debezium.util.Strings; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; /** - * copied from Debezium 1.9.8-Final with new method readUTF8String - * Decodes messages from the PG logical replication plug-in ("pgoutput"). - * See https://www.postgresql.org/docs/10/protocol-logicalrep-message-formats.html for the protocol specification. + * copied from Debezium 1.9.8-Final with new method readUTF8String Decodes messages from the PG + * logical replication plug-in ("pgoutput"). See + * https://www.postgresql.org/docs/10/protocol-logicalrep-message-formats.html for the protocol + * specification. * * @author Gunnar Morling * @author Chris Cranford - * */ public class PgOutputMessageDecoder extends AbstractMessageDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(PgOutputMessageDecoder.class); - private static final Instant PG_EPOCH = LocalDate.of(2000, 1, 1).atStartOfDay().toInstant(ZoneOffset.UTC); + private static final Instant PG_EPOCH = + LocalDate.of(2000, 1, 1).atStartOfDay().toInstant(ZoneOffset.UTC); private static final byte SPACE = 32; private final MessageDecoderContext decoderContext; @@ -74,9 +74,7 @@ public class PgOutputMessageDecoder extends AbstractMessageDecoder { private Instant commitTimestamp; - /** - * Will be null for a non-transactional decoding message - */ + /** Will be null for a non-transactional decoding message */ private Long transactionId; public enum MessageType { @@ -119,20 +117,23 @@ public static MessageType forType(char type) { } } - public PgOutputMessageDecoder(MessageDecoderContext decoderContext, PostgresConnection connection) { + public PgOutputMessageDecoder( + MessageDecoderContext decoderContext, PostgresConnection connection) { this.decoderContext = decoderContext; this.connection = connection; } @Override - public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Lsn startLsn, WalPositionLocator walPosition) { + public boolean shouldMessageBeSkipped( + ByteBuffer buffer, Lsn lastReceivedLsn, Lsn startLsn, WalPositionLocator walPosition) { // Cache position as we're going to peak at the first byte to determine message type // We need to reprocess all BEGIN/COMMIT messages regardless. int position = buffer.position(); try { MessageType type = MessageType.forType((char) buffer.get()); LOGGER.trace("Message Type: {}", type); - final boolean candidateForSkipping = super.shouldMessageBeSkipped(buffer, lastReceivedLsn, startLsn, walPosition); + final boolean candidateForSkipping = + super.shouldMessageBeSkipped(buffer, lastReceivedLsn, startLsn, walPosition); switch (type) { case COMMIT: case BEGIN: @@ -150,29 +151,35 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Ls return false; default: // INSERT/UPDATE/DELETE/TRUNCATE/TYPE/ORIGIN/LOGICAL_DECODING_MESSAGE - // These should be excluded based on the normal behavior, delegating to default method + // These should be excluded based on the normal behavior, delegating to default + // method return candidateForSkipping; } - } - finally { + } finally { // Reset buffer position buffer.position(position); } } @Override - public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { + public void processNotEmptyMessage( + ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) + throws SQLException, InterruptedException { if (LOGGER.isTraceEnabled()) { if (!buffer.hasArray()) { - throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); + throw new IllegalStateException( + "Invalid buffer received from PG server during streaming replication"); } final byte[] source = buffer.array(); - // Extend the array by two as we might need to append two chars and set them to space by default - final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2); + // Extend the array by two as we might need to append two chars and set them to space by + // default + final byte[] content = + Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2); final int lastPos = content.length - 1; content[lastPos - 1] = SPACE; content[lastPos] = SPACE; - LOGGER.trace("Message arrived from database {}", HexConverter.convertToHexString(content)); + LOGGER.trace( + "Message arrived from database {}", HexConverter.convertToHexString(content)); } final MessageType messageType = MessageType.forType((char) buffer.get()); @@ -201,8 +208,7 @@ public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcesso case TRUNCATE: if (isTruncateEventsIncluded()) { decodeTruncate(buffer, typeRegistry, processor); - } - else { + } else { LOGGER.trace("Message Type {} skipped, not processed.", messageType); } break; @@ -213,9 +219,13 @@ public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcesso } @Override - public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function hasMinimumServerVersion) { - builder = builder.withSlotOption("proto_version", 1) - .withSlotOption("publication_names", decoderContext.getConfig().publicationName()); + public ChainedLogicalStreamBuilder optionsWithMetadata( + ChainedLogicalStreamBuilder builder, + Function hasMinimumServerVersion) { + builder = + builder.withSlotOption("proto_version", 1) + .withSlotOption( + "publication_names", decoderContext.getConfig().publicationName()); // DBZ-4374 Use enum once the driver got updated if (hasMinimumServerVersion.apply(140000)) { @@ -226,12 +236,17 @@ public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuild } @Override - public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function hasMinimumServerVersion) { + public ChainedLogicalStreamBuilder optionsWithoutMetadata( + ChainedLogicalStreamBuilder builder, + Function hasMinimumServerVersion) { return builder; } private boolean isTruncateEventsIncluded() { - return !decoderContext.getConfig().getSkippedOperations().contains(Envelope.Operation.TRUNCATE); + return !decoderContext + .getConfig() + .getSkippedOperations() + .contains(Envelope.Operation.TRUNCATE); } /** @@ -240,7 +255,8 @@ private boolean isTruncateEventsIncluded() { * @param buffer The replication stream buffer * @param processor The replication message processor */ - private void handleBeginMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + private void handleBeginMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { final Lsn lsn = Lsn.valueOf(buffer.getLong()); // LSN this.commitTimestamp = PG_EPOCH.plus(buffer.getLong(), ChronoUnit.MICROS); this.transactionId = Integer.toUnsignedLong(buffer.getInt()); @@ -257,7 +273,8 @@ private void handleBeginMessage(ByteBuffer buffer, ReplicationMessageProcessor p * @param buffer The replication stream buffer * @param processor The replication message processor */ - private void handleCommitMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + private void handleCommitMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { int flags = buffer.get(); // flags, currently unused final Lsn lsn = Lsn.valueOf(buffer.getLong()); // LSN of the commit final Lsn endLsn = Lsn.valueOf(buffer.getLong()); // End LSN of the transaction @@ -276,14 +293,20 @@ private void handleCommitMessage(ByteBuffer buffer, ReplicationMessageProcessor * @param buffer The replication stream buffer * @param typeRegistry The postgres type registry */ - private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) throws SQLException { + private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) + throws SQLException { int relationId = buffer.getInt(); String schemaName = readString(buffer); String tableName = readString(buffer); int replicaIdentityId = buffer.get(); short columnCount = buffer.getShort(); - LOGGER.trace("Event: {}, RelationId: {}, Replica Identity: {}, Columns: {}", MessageType.RELATION, relationId, replicaIdentityId, columnCount); + LOGGER.trace( + "Event: {}, RelationId: {}, Replica Identity: {}, Columns: {}", + MessageType.RELATION, + relationId, + replicaIdentityId, + columnCount); LOGGER.trace("Schema: '{}', Table: '{}'", schemaName, tableName); // Perform several out-of-bands database metadata queries @@ -294,15 +317,27 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) final DatabaseMetaData databaseMetadata = connection.connection().getMetaData(); final TableId tableId = new TableId(null, schemaName, tableName); - final List readColumns = getTableColumnsFromDatabase(connection, databaseMetadata, tableId); - columnDefaults = readColumns.stream() - .filter(io.debezium.relational.Column::hasDefaultValue) - .collect(toMap(io.debezium.relational.Column::name, io.debezium.relational.Column::defaultValueExpression)); - - columnOptionality = readColumns.stream().collect(toMap(io.debezium.relational.Column::name, io.debezium.relational.Column::isOptional)); + final List readColumns = + getTableColumnsFromDatabase(connection, databaseMetadata, tableId); + columnDefaults = + readColumns.stream() + .filter(io.debezium.relational.Column::hasDefaultValue) + .collect( + toMap( + io.debezium.relational.Column::name, + io.debezium.relational.Column::defaultValueExpression)); + + columnOptionality = + readColumns.stream() + .collect( + toMap( + io.debezium.relational.Column::name, + io.debezium.relational.Column::isOptional)); primaryKeyColumns = connection.readPrimaryKeyNames(databaseMetadata, tableId); if (primaryKeyColumns == null || primaryKeyColumns.isEmpty()) { - LOGGER.warn("Primary keys are not defined for table '{}', defaulting to unique indices", tableName); + LOGGER.warn( + "Primary keys are not defined for table '{}', defaulting to unique indices", + tableName); primaryKeyColumns = connection.readTableUniqueIndices(databaseMetadata, tableId); } @@ -315,23 +350,37 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) int attypmod = buffer.getInt(); final PostgresType postgresType = typeRegistry.get(columnType); - boolean key = isColumnInPrimaryKey(schemaName, tableName, columnName, primaryKeyColumns); + boolean key = + isColumnInPrimaryKey(schemaName, tableName, columnName, primaryKeyColumns); Boolean optional = columnOptionality.get(columnName); if (optional == null) { - LOGGER.warn("Column '{}' optionality could not be determined, defaulting to true", columnName); + LOGGER.warn( + "Column '{}' optionality could not be determined, defaulting to true", + columnName); optional = true; } final boolean hasDefault = columnDefaults.containsKey(columnName); - final String defaultValueExpression = columnDefaults.getOrDefault(columnName, Optional.empty()).orElse(null); - - columns.add(new ColumnMetaData(columnName, postgresType, key, optional, hasDefault, defaultValueExpression, attypmod)); + final String defaultValueExpression = + columnDefaults.getOrDefault(columnName, Optional.empty()).orElse(null); + + columns.add( + new ColumnMetaData( + columnName, + postgresType, + key, + optional, + hasDefault, + defaultValueExpression, + attypmod)); columnNames.add(columnName); } - // Remove any PKs that do not exist as part of this this relation message. This can occur when issuing - // multiple schema changes in sequence since the lookup of primary keys is an out-of-band procedure, without + // Remove any PKs that do not exist as part of this this relation message. This can occur + // when issuing + // multiple schema changes in sequence since the lookup of primary keys is an out-of-band + // procedure, without // any logical linkage or context to the point in time the relation message was emitted. // // Example DDL: @@ -339,56 +388,80 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) // ALTER TABLE changepk.test_table ADD COLUMN pk3 SERIAL; -- <- relation message #2 // ALTER TABLE changepk.test_table ADD PRIMARY KEY(newpk,pk3); -- <- relation message #3 // - // Consider the above schema changes. There's a possible temporal ordering where the messages arrive - // in the replication slot data stream at time `t0`, `t1`, and `t2`. It then takes until `t10` for _this_ method - // to start processing message #1. At `t10` invoking `connection.readPrimaryKeyNames()` returns the new - // primary key column, 'pk3', defined by message #3. We must remove this primary key column that came - // "from the future" (with temporal respect to the current relate message #1) as a best effort attempt + // Consider the above schema changes. There's a possible temporal ordering where the + // messages arrive + // in the replication slot data stream at time `t0`, `t1`, and `t2`. It then takes until + // `t10` for _this_ method + // to start processing message #1. At `t10` invoking `connection.readPrimaryKeyNames()` + // returns the new + // primary key column, 'pk3', defined by message #3. We must remove this primary key column + // that came + // "from the future" (with temporal respect to the current relate message #1) as a best + // effort attempt // to reflect the actual primary key state at time `t0`. primaryKeyColumns.retainAll(columnNames); - Table table = resolveRelationFromMetadata(new PgOutputRelationMetaData(relationId, schemaName, tableName, columns, primaryKeyColumns)); + Table table = + resolveRelationFromMetadata( + new PgOutputRelationMetaData( + relationId, schemaName, tableName, columns, primaryKeyColumns)); decoderContext.getSchema().applySchemaChangesForTable(relationId, table); } - private List getTableColumnsFromDatabase(PostgresConnection connection, DatabaseMetaData databaseMetadata, TableId tableId) + private List getTableColumnsFromDatabase( + PostgresConnection connection, DatabaseMetaData databaseMetadata, TableId tableId) throws SQLException { List readColumns = new ArrayList<>(); try { - try (ResultSet columnMetadata = databaseMetadata.getColumns(null, tableId.schema(), tableId.table(), null)) { + try (ResultSet columnMetadata = + databaseMetadata.getColumns(null, tableId.schema(), tableId.table(), null)) { while (columnMetadata.next()) { - connection.readColumnForDecoder(columnMetadata, tableId, decoderContext.getConfig().getColumnFilter()) + connection + .readColumnForDecoder( + columnMetadata, + tableId, + decoderContext.getConfig().getColumnFilter()) .ifPresent(readColumns::add); } } - } - catch (SQLException e) { - LOGGER.error("Failed to read column metadata for '{}.{}'", tableId.schema(), tableId.table()); + } catch (SQLException e) { + LOGGER.error( + "Failed to read column metadata for '{}.{}'", + tableId.schema(), + tableId.table()); throw e; } return readColumns; } - private boolean isColumnInPrimaryKey(String schemaName, String tableName, String columnName, List primaryKeyColumns) { + private boolean isColumnInPrimaryKey( + String schemaName, + String tableName, + String columnName, + List primaryKeyColumns) { // todo (DBZ-766) - Discuss this logic with team as there may be a better way to handle this // Personally I think its sufficient enough to resolve the PK based on the out-of-bands call // and should any test fail due to this it should be rewritten or excluded from the pgoutput // scope. // - // In RecordsStreamProducerIT#shouldReceiveChangesForInsertsIndependentOfReplicaIdentity, we have - // a situation where the table is replica identity full, the primary key is dropped but the replica - // identity is kept and later the replica identity is changed to default. In order to support this + // In RecordsStreamProducerIT#shouldReceiveChangesForInsertsIndependentOfReplicaIdentity, we + // have + // a situation where the table is replica identity full, the primary key is dropped but the + // replica + // identity is kept and later the replica identity is changed to default. In order to + // support this // use case, the following abides by these rules: // if (!primaryKeyColumns.isEmpty() && primaryKeyColumns.contains(columnName)) { return true; - } - else if (primaryKeyColumns.isEmpty()) { + } else if (primaryKeyColumns.isEmpty()) { // The table's metadata was either not fetched or table no longer has a primary key // Lets attempt to use the known schema primary key configuration as a fallback - Table existingTable = decoderContext.getSchema().tableFor(new TableId(null, schemaName, tableName)); - if (existingTable != null && existingTable.primaryKeyColumnNames().contains(columnName)) { + Table existingTable = + decoderContext.getSchema().tableFor(new TableId(null, schemaName, tableName)); + if (existingTable != null + && existingTable.primaryKeyColumnNames().contains(columnName)) { return true; } } @@ -402,28 +475,34 @@ else if (primaryKeyColumns.isEmpty()) { * @param typeRegistry The postgres type registry * @param processor The replication message processor */ - private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + private void decodeInsert( + ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { int relationId = buffer.getInt(); char tupleType = (char) buffer.get(); // Always 'N" for inserts - LOGGER.trace("Event: {}, Relation Id: {}, Tuple Type: {}", MessageType.INSERT, relationId, tupleType); + LOGGER.trace( + "Event: {}, Relation Id: {}, Tuple Type: {}", + MessageType.INSERT, + relationId, + tupleType); Optional
resolvedTable = resolveRelation(relationId); // non-captured table if (!resolvedTable.isPresent()) { processor.process(new NoopMessage(transactionId, commitTimestamp)); - } - else { + } else { Table table = resolvedTable.get(); List columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); - processor.process(new PgOutputReplicationMessage( - Operation.INSERT, - table.id().toDoubleQuotedString(), - commitTimestamp, - transactionId, - null, - columns)); + processor.process( + new PgOutputReplicationMessage( + Operation.INSERT, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + null, + columns)); } } @@ -434,7 +513,9 @@ private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat * @param typeRegistry The postgres type registry * @param processor The replication message processor */ - private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + private void decodeUpdate( + ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { int relationId = buffer.getInt(); LOGGER.trace("Event: {}, RelationId: {}", MessageType.UPDATE, relationId); @@ -444,13 +525,14 @@ private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat // non-captured table if (!resolvedTable.isPresent()) { processor.process(new NoopMessage(transactionId, commitTimestamp)); - } - else { + } else { Table table = resolvedTable.get(); // When reading the tuple-type, we could get 3 different values, 'O', 'K', or 'N'. - // 'O' (Optional) - States the following tuple-data is the key, only for replica identity index configs. - // 'K' (Optional) - States the following tuple-data is the old tuple, only for replica identity full configs. + // 'O' (Optional) - States the following tuple-data is the key, only for replica + // identity index configs. + // 'K' (Optional) - States the following tuple-data is the old tuple, only for replica + // identity full configs. // // 'N' (Not-Optional) - States the following tuple-data is the new tuple. // This is always present. @@ -459,18 +541,20 @@ private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat if ('O' == tupleType || 'K' == tupleType) { oldColumns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); // Read the 'N' tuple type - // This is necessary so the stream position is accurate for resolving the column tuple data + // This is necessary so the stream position is accurate for resolving the column + // tuple data tupleType = (char) buffer.get(); } List columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); - processor.process(new PgOutputReplicationMessage( - Operation.UPDATE, - table.id().toDoubleQuotedString(), - commitTimestamp, - transactionId, - oldColumns, - columns)); + processor.process( + new PgOutputReplicationMessage( + Operation.UPDATE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + oldColumns, + columns)); } } @@ -481,40 +565,48 @@ private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat * @param typeRegistry The postgres type registry * @param processor The replication message processor */ - private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + private void decodeDelete( + ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { int relationId = buffer.getInt(); char tupleType = (char) buffer.get(); - LOGGER.trace("Event: {}, RelationId: {}, Tuple Type: {}", MessageType.DELETE, relationId, tupleType); + LOGGER.trace( + "Event: {}, RelationId: {}, Tuple Type: {}", + MessageType.DELETE, + relationId, + tupleType); Optional
resolvedTable = resolveRelation(relationId); // non-captured table if (!resolvedTable.isPresent()) { processor.process(new NoopMessage(transactionId, commitTimestamp)); - } - else { + } else { Table table = resolvedTable.get(); List columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table); - processor.process(new PgOutputReplicationMessage( - Operation.DELETE, - table.id().toDoubleQuotedString(), - commitTimestamp, - transactionId, - columns, - null)); + processor.process( + new PgOutputReplicationMessage( + Operation.DELETE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + columns, + null)); } } /** * Callback handler for the 'T' truncate replication stream message. * - * @param buffer The replication stream buffer + * @param buffer The replication stream buffer * @param typeRegistry The postgres type registry - * @param processor The replication message processor + * @param processor The replication message processor */ - private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + private void decodeTruncate( + ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { // As of PG11, the Truncate message format is as described: // Byte Message Type (Always 'T') // Int32 number of relations described by the truncate message @@ -543,19 +635,24 @@ private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, Replic } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Event: {}, RelationIds: {}, OptionBits: {}", MessageType.TRUNCATE, Arrays.toString(relationIds), optionBits); + LOGGER.trace( + "Event: {}, RelationIds: {}, OptionBits: {}", + MessageType.TRUNCATE, + Arrays.toString(relationIds), + optionBits); } int noOfResolvedTables = tables.size(); for (int i = 0; i < noOfResolvedTables; i++) { Table table = tables.get(i); boolean lastTableInTruncate = (i + 1) == noOfResolvedTables; - processor.process(new PgOutputTruncateReplicationMessage( - Operation.TRUNCATE, - table.id().toDoubleQuotedString(), - commitTimestamp, - transactionId, - lastTableInTruncate)); + processor.process( + new PgOutputTruncateReplicationMessage( + Operation.TRUNCATE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + lastTableInTruncate)); } } @@ -581,14 +678,16 @@ private List getTruncateOptions(int flag) { /** * Callback handler for the 'M' logical decoding message * - * @param buffer The replication stream buffer - * @param processor The replication message processor + * @param buffer The replication stream buffer + * @param processor The replication message processor */ - private void handleLogicalDecodingMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) + private void handleLogicalDecodingMessage( + ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { // As of PG14, the MESSAGE message format is as described: // Byte1 Always 'M' - // Int32 Xid of the transaction (only present for streamed transactions in protocol version 2). + // Int32 Xid of the transaction (only present for streamed transactions in protocol version + // 2). // Int8 flags; Either 0 for no flags or 1 if the logical decoding message is transactional. // Int64 The LSN of the logical decoding message // String The prefix of the logical decoding message. @@ -615,21 +714,22 @@ private void handleLogicalDecodingMessage(ByteBuffer buffer, ReplicationMessageP LOGGER.trace("Transactional: {}", isTransactional); LOGGER.trace("Prefix: {}", prefix); - processor.process(new LogicalDecodingMessage( - Operation.MESSAGE, - commitTimestamp, - transactionId, - isTransactional, - prefix, - content)); + processor.process( + new LogicalDecodingMessage( + Operation.MESSAGE, + commitTimestamp, + transactionId, + isTransactional, + prefix, + content)); } /** * Resolves a given replication message relation identifier to a {@link Table}. * * @param relationId The replication message stream's relation identifier - * @return table resolved from a prior relation message or direct lookup from the schema - * or empty when the table is filtered + * @return table resolved from a prior relation message or direct lookup from the schema or + * empty when the table is filtered */ private Optional
resolveRelation(int relationId) { return Optional.ofNullable(decoderContext.getSchema().tableFor(relationId)); @@ -644,14 +744,17 @@ private Optional
resolveRelation(int relationId) { private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) { List columns = new ArrayList<>(); for (ColumnMetaData columnMetadata : metadata.getColumns()) { - ColumnEditor editor = io.debezium.relational.Column.editor() - .name(columnMetadata.getColumnName()) - .jdbcType(columnMetadata.getPostgresType().getRootType().getJdbcId()) - .nativeType(columnMetadata.getPostgresType().getRootType().getOid()) - .optional(columnMetadata.isOptional()) - .type(columnMetadata.getPostgresType().getName(), columnMetadata.getTypeName()) - .length(columnMetadata.getLength()) - .scale(columnMetadata.getScale()); + ColumnEditor editor = + io.debezium.relational.Column.editor() + .name(columnMetadata.getColumnName()) + .jdbcType(columnMetadata.getPostgresType().getRootType().getJdbcId()) + .nativeType(columnMetadata.getPostgresType().getRootType().getOid()) + .optional(columnMetadata.isOptional()) + .type( + columnMetadata.getPostgresType().getName(), + columnMetadata.getTypeName()) + .length(columnMetadata.getLength()) + .scale(columnMetadata.getScale()); if (columnMetadata.hasDefaultValue()) { editor.defaultValueExpression(columnMetadata.getDefaultValueExpression()); @@ -660,11 +763,12 @@ private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) { columns.add(editor.create()); } - Table table = Table.editor() - .addColumns(columns) - .setPrimaryKeyNames(metadata.getPrimaryKeyNames()) - .tableId(metadata.getTableId()) - .create(); + Table table = + Table.editor() + .addColumns(columns) + .setPrimaryKeyNames(metadata.getPrimaryKeyNames()) + .tableId(metadata.getTableId()) + .create(); LOGGER.trace("Resolved '{}' as '{}'", table.id(), table); @@ -672,7 +776,8 @@ private Table resolveRelationFromMetadata(PgOutputRelationMetaData metadata) { } /** - * Reads the replication stream up to the next null-terminator byte and returns the contents as a string. + * Reads the replication stream up to the next null-terminator byte and returns the contents as + * a string. * * @param buffer The replication stream buffer * @return string read from the replication stream @@ -687,7 +792,9 @@ private static String readString(ByteBuffer buffer) { } /** - * Reads the replication stream up to the next null-terminator byte and return the contents as a utf-8 string + * Reads the replication stream up to the next null-terminator byte and return the contents as a + * utf-8 string + * * @param buffer The replication stream buffer * @return string read from the replication stream */ @@ -716,7 +823,7 @@ private static String readUTF8String(ByteBuffer buffer) { buffer.get(stringBytes); // if a null terminator was found, advance the buffer's position past it - if(nullFound) { + if (nullFound) { buffer.get(); } @@ -724,7 +831,8 @@ private static String readUTF8String(ByteBuffer buffer) { } /** - * Reads the replication stream where the column stream specifies a length followed by the value. + * Reads the replication stream where the column stream specifies a length followed by the + * value. * * @param buffer The replication stream buffer * @return the column value as a string read from the replication stream @@ -744,7 +852,8 @@ private static String readColumnValueAsString(ByteBuffer buffer) { * @param table The database table * @return list of replication message columns */ - private static List resolveColumnsFromStreamTupleData(ByteBuffer buffer, TypeRegistry typeRegistry, Table table) { + private static List resolveColumnsFromStreamTupleData( + ByteBuffer buffer, TypeRegistry typeRegistry, Table table) { // Read number of the columns short numberOfColumns = buffer.getShort(); @@ -766,10 +875,19 @@ private static List resolveColumnsFromStreamTupleData(ByteBuffer buffer, if (type == 't') { final String valueStr = readColumnValueAsString(buffer); columns.add( - new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { + new AbstractReplicationMessageColumn( + columnName, columnType, typeExpression, optional, true) { @Override - public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { - return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes, + public Object getValue( + PgConnectionSupplier connection, + boolean includeUnknownDatatypes) { + return PgOutputReplicationMessage.getValue( + columnName, + columnType, + typeExpression, + valueStr, + connection, + includeUnknownDatatypes, typeRegistry); } @@ -778,21 +896,26 @@ public String toString() { return columnName + "(" + typeExpression + ")=" + valueStr; } }); - } - else if (type == 'u') { + } else if (type == 'u') { columns.add( - new UnchangedToastedReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) { + new UnchangedToastedReplicationMessageColumn( + columnName, columnType, typeExpression, optional, true) { @Override public String toString() { - return columnName + "(" + typeExpression + ") - Unchanged toasted column"; + return columnName + + "(" + + typeExpression + + ") - Unchanged toasted column"; } }); - } - else if (type == 'n') { + } else if (type == 'n') { columns.add( - new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, true, true) { + new AbstractReplicationMessageColumn( + columnName, columnType, typeExpression, true, true) { @Override - public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { + public Object getValue( + PgConnectionSupplier connection, + boolean includeUnknownDatatypes) { return null; } }); @@ -809,4 +932,4 @@ public void close() { connection.close(); } } -} \ No newline at end of file +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 3c213f09dc2..fa6714eb6d5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -943,8 +943,8 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { // scan.incremental.snapshot.enabled = true, use 'latest-offset' // startup mode. + (parallelismSnapshot - ? " 'scan.startup.mode' = 'latest-offset'," - : "") + ? " 'scan.startup.mode' = 'latest-offset'," + : "") + " 'decoding.plugin.name' = 'pgoutput', " + " 'slot.name' = '%s'" + ")", @@ -965,7 +965,6 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { expected.add("+I[1, testName]"); } CloseableIterator iterator = tableResult.collect(); - assertEqualsInAnyOrder(expected, fetchRows(iterator,expected.size())); + assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size())); } - } From f683a4d0402cd7573927b03ef1141fc90549166c Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 3 Sep 2025 06:54:06 +0800 Subject: [PATCH 03/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../src/test/resources/ddl/cn_column_test.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql index 2c9784b3f14..56e79003227 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql @@ -26,8 +26,8 @@ SET search_path TO inventory, public; CREATE TABLE cn_column_test ( "测试id" INTEGER NOT NULL, - "测试name" VARCHAR NOT NULL - PRIMARY KEY (测试Id) + "测试name" VARCHAR NOT NULL, + PRIMARY KEY ("测试Id") ); ALTER TABLE inventory.cn_column_test From bb4a8b0d4eaf307b285021e810a5d0700686c5a6 Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 3 Sep 2025 07:59:24 +0800 Subject: [PATCH 04/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../src/test/resources/ddl/cn_column_test.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql index 56e79003227..89d5877e61d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/cn_column_test.sql @@ -27,7 +27,7 @@ CREATE TABLE cn_column_test ( "测试id" INTEGER NOT NULL, "测试name" VARCHAR NOT NULL, - PRIMARY KEY ("测试Id") + PRIMARY KEY ("测试id") ); ALTER TABLE inventory.cn_column_test From 3ed2f3cfdd63bd508601b095df8a09f53f16270f Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 3 Sep 2025 22:06:26 +0800 Subject: [PATCH 05/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../connectors/postgres/table/PostgreSQLConnectorITCase.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index fa6714eb6d5..153c630525f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -961,9 +961,8 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { // async submit job TableResult tableResult = tEnv.executeSql("SELECT * FROM cn_column_table"); List expected = new ArrayList<>(); - if (!parallelismSnapshot) { - expected.add("+I[1, testName]"); - } + expected.add("+I[1, testName]"); + Thread.sleep(5000L); CloseableIterator iterator = tableResult.collect(); assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size())); } From 9eeb7cace35e2e13e06f7cc75f1c59f000a37661 Mon Sep 17 00:00:00 2001 From: jennychen Date: Tue, 16 Sep 2025 22:21:00 +0800 Subject: [PATCH 06/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../postgresql/connection/pgoutput/PgOutputMessageDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 46be0d6f011..49ea9c491fc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -345,7 +345,7 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) Set columnNames = new HashSet<>(); for (short i = 0; i < columnCount; ++i) { byte flags = buffer.get(); - String columnName = Strings.unquoteIdentifierPart(readUTF8String(buffer)); + String columnName = Strings.unquoteIdentifierPart(readString(buffer)); int columnType = buffer.getInt(); int attypmod = buffer.getInt(); From 093be4b73d458780483105d08ea5b7fe552cec74 Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 17 Sep 2025 21:30:21 +0800 Subject: [PATCH 07/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../postgresql/connection/pgoutput/PgOutputMessageDecoder.java | 2 +- .../connectors/postgres/table/PostgreSQLConnectorITCase.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 49ea9c491fc..46be0d6f011 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -345,7 +345,7 @@ private void handleRelationMessage(ByteBuffer buffer, TypeRegistry typeRegistry) Set columnNames = new HashSet<>(); for (short i = 0; i < columnCount; ++i) { byte flags = buffer.get(); - String columnName = Strings.unquoteIdentifierPart(readString(buffer)); + String columnName = Strings.unquoteIdentifierPart(readUTF8String(buffer)); int columnType = buffer.getInt(); int attypmod = buffer.getInt(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 153c630525f..1a8949ba613 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -965,5 +965,7 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { Thread.sleep(5000L); CloseableIterator iterator = tableResult.collect(); assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size())); + tableResult.getJobClient().get().cancel().get(); + RowUtils.USE_LEGACY_TO_STRING = true; } } From 0ccfadf6f581bc178517a513c2b746730a094db9 Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 17 Sep 2025 22:48:39 +0800 Subject: [PATCH 08/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../postgres/table/PostgreSQLConnectorITCase.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 1a8949ba613..c3c796823ef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -922,6 +922,7 @@ void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Except @ValueSource(booleans = {true, false}) void testCNColumns(boolean parallelismSnapshot) throws Throwable { setup(parallelismSnapshot); + RowUtils.USE_LEGACY_TO_STRING = false; initializePostgresTable(POSTGRES_CONTAINER, "cn_column_test"); String sourceDDL = String.format( @@ -960,8 +961,13 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { tEnv.executeSql(sourceDDL); // async submit job TableResult tableResult = tEnv.executeSql("SELECT * FROM cn_column_table"); + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute("INSERT INTO inventory.cn_column_test VALUES (2, 'testAnotherName');"); + } List expected = new ArrayList<>(); - expected.add("+I[1, testName]"); + expected.add("+I[2, testAnotherName]"); Thread.sleep(5000L); CloseableIterator iterator = tableResult.collect(); assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size())); From 34ca38fa159c57d9e43a5d796049e2a5892034ea Mon Sep 17 00:00:00 2001 From: jennychen Date: Wed, 17 Sep 2025 23:07:46 +0800 Subject: [PATCH 09/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../connectors/postgres/table/PostgreSQLConnectorITCase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index c3c796823ef..21004240f53 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -963,8 +963,9 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { TableResult tableResult = tEnv.executeSql("SELECT * FROM cn_column_table"); // generate WAL try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); - Statement statement = connection.createStatement()) { - statement.execute("INSERT INTO inventory.cn_column_test VALUES (2, 'testAnotherName');"); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO inventory.cn_column_test VALUES (2, 'testAnotherName');"); } List expected = new ArrayList<>(); expected.add("+I[2, testAnotherName]"); From 29c7f16481ce0fbbe3eb33f70bce0822c8572a99 Mon Sep 17 00:00:00 2001 From: jennychen Date: Thu, 18 Sep 2025 09:09:31 +0800 Subject: [PATCH 10/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../postgres/table/PostgreSQLConnectorITCase.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 21004240f53..6d20bd31dee 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -959,17 +959,23 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { parallelismSnapshot, getSlotName()); tEnv.executeSql(sourceDDL); + LOG.info("test cn column, sourceDDL:{}", sourceDDL); + // async submit job TableResult tableResult = tEnv.executeSql("SELECT * FROM cn_column_table"); + + Thread.sleep(5000L); // generate WAL try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( "INSERT INTO inventory.cn_column_test VALUES (2, 'testAnotherName');"); + statement.executeQuery("select * from inventory.cn_column_test"); } + List expected = new ArrayList<>(); expected.add("+I[2, testAnotherName]"); - Thread.sleep(5000L); + CloseableIterator iterator = tableResult.collect(); assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size())); tableResult.getJobClient().get().cancel().get(); From afa038fb5c9c9edbeb6c65495caef0a4bbf3383d Mon Sep 17 00:00:00 2001 From: jennychen Date: Thu, 18 Sep 2025 20:51:34 +0800 Subject: [PATCH 11/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../connectors/postgres/table/PostgreSQLConnectorITCase.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 6d20bd31dee..954ab1de6b6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -974,6 +974,9 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { } List expected = new ArrayList<>(); + if (parallelismSnapshot) { + expected.add("+I[1, a]"); + } expected.add("+I[2, testAnotherName]"); CloseableIterator iterator = tableResult.collect(); From 84d834d4c576b0d3e00c5b9c2742b3eced3196d5 Mon Sep 17 00:00:00 2001 From: jennychen Date: Thu, 18 Sep 2025 22:48:29 +0800 Subject: [PATCH 12/12] [FLINK-38110] fix PostgreSQL connector reads Chinese columns with garbled characters --- .../connectors/postgres/table/PostgreSQLConnectorITCase.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 954ab1de6b6..a7bdf5d98c3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -970,12 +970,11 @@ void testCNColumns(boolean parallelismSnapshot) throws Throwable { Statement statement = connection.createStatement()) { statement.execute( "INSERT INTO inventory.cn_column_test VALUES (2, 'testAnotherName');"); - statement.executeQuery("select * from inventory.cn_column_test"); } List expected = new ArrayList<>(); - if (parallelismSnapshot) { - expected.add("+I[1, a]"); + if (!parallelismSnapshot) { + expected.add("+I[1, testName]"); } expected.add("+I[2, testAnotherName]");