Skip to content

Commit c89cdd9

Browse files
[FLINK-38742][cdc/postgres] Fix integration tests to expect TIMESTAMP_TZ instead of TIMESTAMP_LTZ
Update PostgresFullTypesITCase to expect TIMESTAMP_TZ for PostgreSQL TIMESTAMPTZ columns, matching the corrected schema inference behavior.
1 parent a7fa7e1 commit c89cdd9

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresFullTypesITCase.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import org.apache.flink.cdc.common.data.ArrayData;
2424
import org.apache.flink.cdc.common.data.DateData;
2525
import org.apache.flink.cdc.common.data.DecimalData;
26-
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2726
import org.apache.flink.cdc.common.data.RecordData;
2827
import org.apache.flink.cdc.common.data.TimeData;
2928
import org.apache.flink.cdc.common.data.TimestampData;
29+
import org.apache.flink.cdc.common.data.ZonedTimestampData;
3030
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
3131
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
3232
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -68,6 +68,7 @@
6868
import java.time.Instant;
6969
import java.time.LocalDateTime;
7070
import java.time.LocalTime;
71+
import java.time.OffsetDateTime;
7172
import java.time.ZoneId;
7273
import java.util.ArrayList;
7374
import java.util.HashMap;
@@ -298,7 +299,8 @@ public void testTimeTypesWithTemporalModeAdaptive() throws Exception {
298299
TimestampData.fromLocalDateTime(
299300
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
300301
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
301-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
302+
ZonedTimestampData.fromOffsetDateTime(
303+
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
302304
};
303305

304306
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -354,7 +356,8 @@ public void testTimeTypesWithTemporalModeMicroSeconds() throws Exception {
354356
TimestampData.fromLocalDateTime(
355357
LocalDateTime.parse("2020-07-17T18:00:22.123456")),
356358
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
357-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
359+
ZonedTimestampData.fromOffsetDateTime(
360+
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
358361
};
359362

360363
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -409,7 +412,8 @@ public void testTimeTypesWithTemporalModeConnect() throws Exception {
409412
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
410413
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22.123")),
411414
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-07-17T18:00:22")),
412-
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
415+
ZonedTimestampData.fromOffsetDateTime(
416+
OffsetDateTime.parse("2020-07-17T10:00:22Z")),
413417
};
414418

415419
List<Event> snapshotResults = fetchResultsAndCreateTableEvent(events, 1).f0;
@@ -1042,7 +1046,7 @@ private Instant toInstant(String ts) {
10421046
DataTypes.TIMESTAMP(3),
10431047
DataTypes.TIMESTAMP(6),
10441048
DataTypes.TIMESTAMP(),
1045-
DataTypes.TIMESTAMP_LTZ(0));
1049+
DataTypes.TIMESTAMP_TZ(0));
10461050

10471051
private static final RowType HSTORE_TYPES_WITH_ADAPTIVE =
10481052
RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,9 @@ protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
377377
String str = (String) dbzObj;
378378
// ZonedTimestamp type is encoded in string type with timezone offset
379379
// Format: ISO-8601 with timezone offset (e.g., "2020-07-17T18:00:22+00:00")
380+
// According to Debezium documentation, PostgreSQL TIMESTAMPTZ is ALWAYS encoded as
381+
// String
382+
// with ZonedTimestamp.SCHEMA_NAME, regardless of time.precision.mode
380383
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
381384
// Parse using Debezium's ZonedTimestamp formatter
382385
OffsetDateTime offsetDateTime = OffsetDateTime.parse(str, ZonedTimestamp.FORMATTER);
@@ -391,7 +394,11 @@ protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
391394
"Unable to convert to TIMESTAMP WITH TIME ZONE from unexpected value '"
392395
+ dbzObj
393396
+ "' of type "
394-
+ dbzObj.getClass().getName());
397+
+ dbzObj.getClass().getName()
398+
+ " with schema name '"
399+
+ (schema != null ? schema.name() : "null")
400+
+ "'. PostgreSQL TIMESTAMPTZ should always be encoded as String with "
401+
+ ZonedTimestamp.SCHEMA_NAME);
395402
}
396403

397404
protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {

0 commit comments

Comments
 (0)