Skip to content

Commit cdc044e

Browse files
authored
Add timezone support, #170 (#171)
1 parent cb80983 commit cdc044e

File tree

9 files changed

+175
-4
lines changed

9 files changed

+175
-4
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ types_mapping: # optional
243243

244244
ignore_deletes: false # optional, set to true to ignore DELETE operations
245245

246+
mysql_timezone: 'UTC' # optional, timezone for MySQL timestamp conversion (default: 'UTC')
247+
246248
```
247249
248250
#### Required settings
@@ -267,6 +269,7 @@ ignore_deletes: false # optional, set to true to ignore DELETE operations
267269
- `http_host`, `http_port` - http endpoint to control replication, use `/docs` for abailable commands
268270
- `types_mappings` - custom types mapping, eg. you can map char(36) to UUID instead of String, etc.
269271
- `ignore_deletes` - when set to `true`, DELETE operations in MySQL will be ignored during replication. This creates an append-only model where data is only added, never removed. In this mode, the replicator doesn't create a temporary database and instead replicates directly to the target database.
272+
- `mysql_timezone` - timezone to use for MySQL timestamp conversion to ClickHouse DateTime64. Default is `'UTC'`. Accepts any valid timezone name (e.g., `'America/New_York'`, `'Europe/London'`, `'Asia/Tokyo'`). This setting ensures proper timezone handling when converting MySQL timestamp fields to ClickHouse DateTime64 with timezone information.
270273

271274
Few more tables / dbs examples:
272275

mysql_ch_replicator/binlog_replicator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ def __init__(self, settings: Settings):
367367
resume_stream=True,
368368
log_pos=log_pos,
369369
log_file=log_file,
370+
mysql_timezone=settings.mysql_timezone,
370371
)
371372
self.last_state_update = 0
372373
self.last_binlog_clear_time = 0

mysql_ch_replicator/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import yaml
22
import fnmatch
3+
import zoneinfo
34

45
from dataclasses import dataclass
56

@@ -129,6 +130,7 @@ def __init__(self):
129130
self.target_databases = {}
130131
self.initial_replication_threads = 0
131132
self.ignore_deletes = False
133+
self.mysql_timezone = 'UTC'
132134

133135
def load(self, settings_file):
134136
data = open(settings_file, 'r').read()
@@ -155,6 +157,7 @@ def load(self, settings_file):
155157
self.target_databases = data.pop('target_databases', {})
156158
self.initial_replication_threads = data.pop('initial_replication_threads', 0)
157159
self.ignore_deletes = data.pop('ignore_deletes', False)
160+
self.mysql_timezone = data.pop('mysql_timezone', 'UTC')
158161

159162
indexes = data.pop('indexes', [])
160163
for index in indexes:
@@ -204,6 +207,16 @@ def validate_log_level(self):
204207
if self.log_level == 'debug':
205208
self.debug_log_level = True
206209

210+
def validate_mysql_timezone(self):
211+
if not isinstance(self.mysql_timezone, str):
212+
raise ValueError(f'mysql_timezone should be string and not {stype(self.mysql_timezone)}')
213+
214+
# Validate timezone by attempting to import and check if it's valid
215+
try:
216+
zoneinfo.ZoneInfo(self.mysql_timezone)
217+
except zoneinfo.ZoneInfoNotFoundError:
218+
raise ValueError(f'invalid timezone: {self.mysql_timezone}. Use IANA timezone names like "UTC", "Europe/London", "America/New_York", etc.')
219+
207220
def get_indexes(self, db_name, table_name):
208221
results = []
209222
for index in self.indexes:
@@ -235,3 +248,4 @@ def validate(self):
235248
raise ValueError(f'initial_replication_threads should be an integer, not {type(self.initial_replication_threads)}')
236249
if self.initial_replication_threads < 0:
237250
raise ValueError(f'initial_replication_threads should be non-negative')
251+
self.validate_mysql_timezone()

mysql_ch_replicator/converter.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def strip_sql_comments(sql_statement):
214214
return sqlparse.format(sql_statement, strip_comments=True).strip()
215215

216216

217-
def convert_timestamp_to_datetime64(input_str):
217+
def convert_timestamp_to_datetime64(input_str, timezone='UTC'):
218218

219219
# Define the regex pattern
220220
pattern = r'^timestamp(?:\((\d+)\))?$'
@@ -226,9 +226,17 @@ def convert_timestamp_to_datetime64(input_str):
226226
# If a precision is provided, include it in the replacement
227227
precision = match.group(1)
228228
if precision is not None:
229-
return f'DateTime64({precision})'
229+
# Only add timezone info if it's not UTC (to preserve original behavior)
230+
if timezone == 'UTC':
231+
return f'DateTime64({precision})'
232+
else:
233+
return f'DateTime64({precision}, \'{timezone}\')'
230234
else:
231-
return 'DateTime64'
235+
# Only add timezone info if it's not UTC (to preserve original behavior)
236+
if timezone == 'UTC':
237+
return 'DateTime64'
238+
else:
239+
return f'DateTime64(3, \'{timezone}\')'
232240
else:
233241
raise ValueError(f"Invalid input string format: '{input_str}'")
234242

@@ -372,7 +380,10 @@ def convert_type(self, mysql_type, parameters):
372380
if 'real' in mysql_type:
373381
return 'Float64'
374382
if mysql_type.startswith('timestamp'):
375-
return convert_timestamp_to_datetime64(mysql_type)
383+
timezone = 'UTC'
384+
if self.db_replicator is not None:
385+
timezone = self.db_replicator.config.mysql_timezone
386+
return convert_timestamp_to_datetime64(mysql_type, timezone)
376387
if mysql_type.startswith('time'):
377388
return 'String'
378389
if 'varbinary' in mysql_type:

mysql_ch_replicator/pymysqlreplication/binlogstream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def __init__(
188188
ignore_decode_errors=False,
189189
verify_checksum=False,
190190
enable_logging=True,
191+
mysql_timezone="UTC",
191192
):
192193
"""
193194
Attributes:
@@ -230,6 +231,7 @@ def __init__(
230231
verify_checksum: If true, verify events read from the binary log by examining checksums.
231232
enable_logging: When set to True, logs various details helpful for debugging and monitoring
232233
When set to False, logging is disabled to enhance performance.
234+
mysql_timezone: Timezone to use for MySQL timestamp conversion (e.g., 'UTC', 'America/New_York')
233235
"""
234236

235237
self.__connection_settings = connection_settings
@@ -254,6 +256,7 @@ def __init__(
254256
self.__ignore_decode_errors = ignore_decode_errors
255257
self.__verify_checksum = verify_checksum
256258
self.__optional_meta_data = False
259+
self.__mysql_timezone = mysql_timezone
257260

258261
# We can't filter on packet level TABLE_MAP and rotate event because
259262
# we need them for handling other operations
@@ -636,6 +639,7 @@ def fetchone(self):
636639
self.__ignore_decode_errors,
637640
self.__verify_checksum,
638641
self.__optional_meta_data,
642+
self.__mysql_timezone,
639643
)
640644

641645
if binlog_event.event_type == ROTATE_EVENT:

mysql_ch_replicator/pymysqlreplication/event.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(
2828
ignore_decode_errors=False,
2929
verify_checksum=False,
3030
optional_meta_data=False,
31+
mysql_timezone="UTC",
3132
):
3233
self.packet = from_packet
3334
self.table_map = table_map
@@ -39,6 +40,7 @@ def __init__(
3940
self._ignore_decode_errors = ignore_decode_errors
4041
self._verify_checksum = verify_checksum
4142
self._is_event_valid = None
43+
self.mysql_timezone = mysql_timezone
4244
# The event have been fully processed, if processed is false
4345
# the event will be skipped
4446
self._processed = True

mysql_ch_replicator/pymysqlreplication/packet.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def __init__(
7575
ignore_decode_errors,
7676
verify_checksum,
7777
optional_meta_data,
78+
mysql_timezone="UTC",
7879
):
7980
# -1 because we ignore the ok byte
8081
self.read_bytes = 0
@@ -128,6 +129,7 @@ def __init__(
128129
ignore_decode_errors=ignore_decode_errors,
129130
verify_checksum=verify_checksum,
130131
optional_meta_data=optional_meta_data,
132+
mysql_timezone=mysql_timezone,
131133
)
132134
if not self.event._processed:
133135
self.event = None

mysql_ch_replicator/pymysqlreplication/row_event.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import struct
22
import decimal
33
import datetime
4+
import zoneinfo
45

56
from pymysql.charset import charset_by_name
67
from enum import Enum
@@ -100,6 +101,31 @@ def _is_null(null_bitmap, position):
100101
bit = ord(bit)
101102
return bit & (1 << (position % 8))
102103

104+
def _convert_timestamp_with_timezone(self, timestamp_value):
105+
"""
106+
Convert timestamp from UTC to configured timezone
107+
108+
:param timestamp_value: Unix timestamp value
109+
:return: datetime object in configured timezone
110+
"""
111+
# Create UTC datetime first
112+
utc_dt = datetime.datetime.utcfromtimestamp(timestamp_value)
113+
114+
# If timezone is UTC, return timezone-aware UTC datetime
115+
if self.mysql_timezone == "UTC":
116+
return utc_dt.replace(tzinfo=datetime.timezone.utc)
117+
118+
# Convert to configured timezone but keep timezone-aware
119+
try:
120+
# Start with UTC timezone-aware datetime
121+
utc_dt_aware = utc_dt.replace(tzinfo=datetime.timezone.utc)
122+
# Convert to target timezone
123+
target_tz = zoneinfo.ZoneInfo(self.mysql_timezone)
124+
return utc_dt_aware.astimezone(target_tz)
125+
except zoneinfo.ZoneInfoNotFoundError:
126+
# If timezone is invalid, fall back to UTC
127+
return utc_dt.replace(tzinfo=datetime.timezone.utc)
128+
103129
def _read_column_data(self, cols_bitmap, row_image_type=None):
104130
"""Use for WRITE, UPDATE and DELETE events.
105131
Return an array of column data

test_mysql_ch_replicator.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2804,3 +2804,111 @@ def test_json2():
28042804
assert json.loads(ch.select(TEST_TABLE_NAME, "name='Peter'")[0]['data'])['в'] == 'б'
28052805
db_replicator_runner.stop()
28062806
binlog_replicator_runner.stop()
2807+
2808+
def test_timezone_conversion():
2809+
"""
2810+
Test that MySQL timestamp fields are converted to ClickHouse DateTime64 with custom timezone.
2811+
This test reproduces the issue from GitHub issue #170.
2812+
"""
2813+
# Create a temporary config file with custom timezone
2814+
config_content = """
2815+
mysql:
2816+
host: 'localhost'
2817+
port: 9306
2818+
user: 'root'
2819+
password: 'admin'
2820+
2821+
clickhouse:
2822+
host: 'localhost'
2823+
port: 9123
2824+
user: 'default'
2825+
password: 'admin'
2826+
2827+
binlog_replicator:
2828+
data_dir: '/app/binlog/'
2829+
records_per_file: 100000
2830+
2831+
databases: '*test*'
2832+
log_level: 'debug'
2833+
mysql_timezone: 'America/New_York'
2834+
"""
2835+
2836+
# Create temporary config file
2837+
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
2838+
f.write(config_content)
2839+
temp_config_file = f.name
2840+
2841+
try:
2842+
cfg = config.Settings()
2843+
cfg.load(temp_config_file)
2844+
2845+
# Verify timezone is loaded correctly
2846+
assert cfg.mysql_timezone == 'America/New_York'
2847+
2848+
mysql = mysql_api.MySQLApi(
2849+
database=None,
2850+
mysql_settings=cfg.mysql,
2851+
)
2852+
2853+
ch = clickhouse_api.ClickhouseApi(
2854+
database=TEST_DB_NAME,
2855+
clickhouse_settings=cfg.clickhouse,
2856+
)
2857+
2858+
prepare_env(cfg, mysql, ch)
2859+
2860+
# Create table with timestamp fields
2861+
mysql.execute(f'''
2862+
CREATE TABLE `{TEST_TABLE_NAME}` (
2863+
id int NOT NULL AUTO_INCREMENT,
2864+
name varchar(255),
2865+
created_at timestamp NULL,
2866+
updated_at timestamp(3) NULL,
2867+
PRIMARY KEY (id)
2868+
);
2869+
''')
2870+
2871+
# Insert test data with specific timestamp
2872+
mysql.execute(
2873+
f"INSERT INTO `{TEST_TABLE_NAME}` (name, created_at, updated_at) "
2874+
f"VALUES ('test_timezone', '2023-08-15 14:30:00', '2023-08-15 14:30:00.123');",
2875+
commit=True,
2876+
)
2877+
2878+
# Run replication
2879+
run_all_runner = RunAllRunner(cfg_file=temp_config_file)
2880+
run_all_runner.run()
2881+
2882+
assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
2883+
ch.execute_command(f'USE `{TEST_DB_NAME}`')
2884+
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
2885+
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)
2886+
2887+
# Get the table structure from ClickHouse
2888+
table_info = ch.query(f'DESCRIBE `{TEST_TABLE_NAME}`')
2889+
2890+
# Check that timestamp fields are converted to DateTime64 with timezone
2891+
created_at_type = None
2892+
updated_at_type = None
2893+
for row in table_info.result_rows:
2894+
if row[0] == 'created_at':
2895+
created_at_type = row[1]
2896+
elif row[0] == 'updated_at':
2897+
updated_at_type = row[1]
2898+
2899+
# Verify the types include the timezone
2900+
assert created_at_type is not None
2901+
assert updated_at_type is not None
2902+
assert 'America/New_York' in created_at_type
2903+
assert 'America/New_York' in updated_at_type
2904+
2905+
# Verify data was inserted correctly
2906+
results = ch.select(TEST_TABLE_NAME)
2907+
assert len(results) == 1
2908+
assert results[0]['name'] == 'test_timezone'
2909+
2910+
run_all_runner.stop()
2911+
2912+
finally:
2913+
# Clean up temporary config file
2914+
os.unlink(temp_config_file)

0 commit comments

Comments
 (0)