diff --git a/python/sources/mysql_cdc/README.md b/python/sources/mysql_cdc/README.md new file mode 100644 index 00000000..9debe7f3 --- /dev/null +++ b/python/sources/mysql_cdc/README.md @@ -0,0 +1,162 @@ +# MySQL CDC + +This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It's built using **Quix Streams StatefulSource** to ensure exactly-once processing and automatic recovery after restarts. + +## Key Features + +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing + +## How to run + +1. Set up your MySQL database with binary logging enabled +2. Configure environment variables for MySQL connection +3. Install dependencies: `pip install -r requirements.txt` +4. Run: `python main.py` + +## Environment variables + +### Required MySQL Connection +- **output**: Name of the output topic to write into. +- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server. +- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306). +- **MYSQL_DATABASE**: The name of the database for CDC. +- **MYSQL_USER**: The username that should be used to interact with the database. +- **MYSQL_PASSWORD**: The password for the user configured above. +- **MYSQL_TABLE**: The name of the table for CDC. + +### Optional Configuration +- **MYSQL_SNAPSHOT_HOST**: MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica). +- **MYSQL_INITIAL_SNAPSHOT**: Set to "true" to perform initial snapshot of existing data (default: false). +- **MYSQL_SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000). +- **MYSQL_FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false). + +## Quix Streams StatefulSource + +The connector uses Quix Streams' `StatefulSource` class which provides: + +- **Automatic State Persistence**: Binlog positions and snapshot status are automatically saved to the state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Fault Tolerance**: Automatic recovery from failures with consistent state +- **Production-Ready**: Built on Quix Streams' proven architecture + +### State Management: +- **Binlog Position**: Automatically tracked as `binlog_position_{schema}_{table}` +- **Snapshot Completion**: Tracked as `snapshot_completed_{schema}_{table}` +- **Transactional Processing**: State changes are committed atomically with message production + +Example state data: +```json +{ + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0 +} +``` + +## Initial Snapshot + +Enable initial snapshot to capture existing table data before starting CDC: + +```env +MYSQL_INITIAL_SNAPSHOT=true +MYSQL_SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica +``` + +The initial snapshot: +- Processes data in configurable batches to avoid memory issues +- Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts +- Marks completion in the StatefulSource state store to avoid re-processing on restart +- Can be forced to re-run with `FORCE_SNAPSHOT=true` + +## Requirements / Prerequisites + +- A MySQL Database with binary logging enabled. +- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration. +- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges. +- For initial snapshot: `SELECT` privilege on the target table. + +### MySQL Configuration Example +```ini +[mysqld] +server-id = 1 +log_bin = /var/log/mysql/mysql-bin.log +binlog_expire_logs_seconds = 864000 +max_binlog_size = 100M +binlog-format = ROW +binlog_row_metadata = FULL +binlog_row_image = FULL +``` + +### MySQL User Permissions +```sql +-- Create replication user +CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + +-- Grant replication privileges +GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + +-- Grant select for initial snapshot +GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + +FLUSH PRIVILEGES; +``` + +## Change Event Format + +### INSERT/Snapshot Insert +```json +{ + "kind": "insert", // or "snapshot_insert" for initial snapshot + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "John Doe"], + "oldkeys": {} +} +``` + +### UPDATE +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["id", "name"], + "columnvalues": [123, "Jane Doe"], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "John Doe"] + } +} +``` + +### DELETE +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["id", "name"], + "keyvalues": [123, "Jane Doe"] + } +} +``` + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/python/sources/mysql_cdc/dockerfile b/python/sources/mysql_cdc/dockerfile new file mode 100644 index 00000000..94050c86 --- /dev/null +++ b/python/sources/mysql_cdc/dockerfile @@ -0,0 +1,35 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# TODO: remove this RUN block when done doing "@ git+" install in requirements.txt +# This should be done BEFORE merging PR +RUN apt-get update && \ + apt-get install -y git && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/sources/mysql_cdc/helper_functions.py b/python/sources/mysql_cdc/helper_functions.py new file mode 100644 index 00000000..779146c6 --- /dev/null +++ b/python/sources/mysql_cdc/helper_functions.py @@ -0,0 +1,101 @@ +from datetime import timedelta +import os +import json + + +def load_config(): + driver = os.environ["driver"] + server = os.environ["server"] + user_id = os.environ["userid"] + password = os.environ["password"] + database = os.environ["database"] + table_name = os.environ["table_name"] + last_modified_column = os.environ["last_modified_column"] + time_delta_config = os.environ["time_delta"] + + try: + use_utc_for_offset = bool(os.environ["offset_is_utc"]) + except Exception as e: + raise Exception("Use UTC For Offset must be True or False", e) + + drop_cols = os.getenv("columns_to_drop") + rename_cols = None + passed_rename_cols = os.getenv("columns_to_rename") + + try: + poll_interval = int(os.environ["poll_interval_seconds"]) + except Exception as e: + raise Exception("Poll Interval must be an integer", e) + + if poll_interval < 1: + poll_interval = 1 + + try: + if passed_rename_cols != None and passed_rename_cols != "": + rename_cols = json.loads(passed_rename_cols) + except Exception as e: + raise Exception("Invalid JSON supplied for column renames", e) + + return { + "driver": driver, + "server": server, + "user_id": user_id, + "password": password, + "database": database, + "table_name": table_name, + "last_modified_column": last_modified_column, + "time_delta": make_time_delta_from_config(time_delta_config), + "drop_cols": drop_cols, + "rename_cols": rename_cols, + "use_utc": use_utc_for_offset, + "poll_interval": poll_interval + } + + +def make_time_delta_from_config(time_delta_config) -> timedelta: + time_delta_values = time_delta_config.split(",") + + if len(time_delta_values) != 5: + raise Exception( + "time_delta_config must contain 5 values, one for each of seconds, minutes, hours, days and weeks") + + try: + seconds = int(time_delta_values[0]) + minutes = int(time_delta_values[1]) + hours = int(time_delta_values[2]) + days = int(time_delta_values[3]) + weeks = int(time_delta_values[4]) + return timedelta(seconds = seconds, minutes = minutes, hours = hours, days = days, weeks = weeks) + except TypeError as te: + raise Exception("Unable to cast one of the supplied values to int", te) + except Exception as e: + raise Exception("Something went wrong configuring the time delta", e) + + +def check_table_exists(conn, table) -> bool: + if not conn.cursor().tables(table).fetchone(): + print("Table does not exist") + return False + return True + + +def check_column_exists(conn, table, column_name) -> bool: + for c in conn.cursor().columns(table = table): + if column_name == c.column_name: + return True + print("Key column [{}] not found in table [{}]".format(column_name, table)) + return False + + +def drop_columns(conn, cols_to_drop, table_data, table_name) -> any: + for col in cols_to_drop: + if check_column_exists(conn, table_name, col): + table_data = table_data.drop(col, 1) + return table_data + + +def rename_columns(conn, cols_to_rename, table_data, table_name) -> any: + for col in cols_to_rename: + if check_column_exists(conn, table_name, col): + table_data = table_data.rename(columns={col: cols_to_rename[col]}) + return table_data \ No newline at end of file diff --git a/python/sources/mysql_cdc/icon.png b/python/sources/mysql_cdc/icon.png new file mode 100644 index 00000000..725e0be4 Binary files /dev/null and b/python/sources/mysql_cdc/icon.png differ diff --git a/python/sources/mysql_cdc/library.json b/python/sources/mysql_cdc/library.json new file mode 100644 index 00000000..fb81f5bf --- /dev/null +++ b/python/sources/mysql_cdc/library.json @@ -0,0 +1,115 @@ +{ + "libraryItemId": "mysql-cdc-source", + "name": "MySQL CDC Source", + "language": "Python", + "IconFile": "icon.png", + "tags": { + "Type": ["Connectors"], + "Pipeline Stage": ["Source"], + "Category": ["SQL DB"] + }, + "shortDescription": "Capture changes to a MySQL database table and publish the change events to a Kafka topic using binary log replication.", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "DefaultFile": "main.py", + "Variables": [ + { + "Name": "output", + "Type": "EnvironmentVariable", + "InputType": "OutputTopic", + "Description": "This is the output topic", + "DefaultValue": "mysql-cdc-source", + "Required": true + }, + { + "Name": "MYSQL_HOST", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Host name of MySQL", + "DefaultValue": "", + "Required": true + }, + { + "Name": "MYSQL_PORT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Port of MySQL", + "DefaultValue": "3306", + "Required": true + }, + { + "Name": "MYSQL_USER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Username of MySQL", + "Required": true + }, + { + "Name": "MYSQL_PASSWORD", + "Type": "EnvironmentVariable", + "InputType": "HiddenText", + "Description": "Password of MySQL", + "Required": true + }, + { + "Name": "MYSQL_DATABASE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Database name of MySQL", + "Required": true + }, + { + "Name": "MYSQL_TABLE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Name of table for CDC", + "Required": true + }, + { + "Name": "MYSQL_INITIAL_SNAPSHOT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Set to 'true' to perform initial snapshot of existing data", + "DefaultValue": "false", + "Required": false + }, + { + "Name": "MYSQL_FORCE_SNAPSHOT", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Name of table for CDC", + "DefaultValue": "false", + "Required": false + }, + { + "Name": "MYSQL_SNAPSHOT_BATCH_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Number of rows to process in each snapshot batch.", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "MYSQL_SNAPSHOT_HOST", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica).", + "Required": false + }, + { + "Name": "CONSUMER_GROUP", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Desired consumer group", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 200, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} \ No newline at end of file diff --git a/python/sources/mysql_cdc/main.py b/python/sources/mysql_cdc/main.py new file mode 100644 index 00000000..93556410 --- /dev/null +++ b/python/sources/mysql_cdc/main.py @@ -0,0 +1,36 @@ +import os +import logging +from dotenv import load_dotenv + +from quixstreams import Application +from quixstreams.sources.community.mysql_cdc import MySqlCdcSource + +load_dotenv() + +logger = logging.getLogger(__name__) + +app = Application( + consumer_group=os.getenv("CONSUMER_GROUP", "mysql_cdc"), +) +output_topic = app.topic(os.environ["output"]) +mysql_source = MySqlCdcSource( + host=os.environ["MYSQL_HOST"], + port=os.environ["MYSQL_PORT"], + user=os.environ["MYSQL_USER"], + password=os.environ["MYSQL_PASSWORD"], + database=os.environ["MYSQL_DATABASE"], + table=os.environ["MYSQL_TABLE"], + initial_snapshot=os.getenv("MYSQL_INITIAL_SNAPSHOT", "false").lower() == "true", + snapshot_host=os.getenv("MYSQL_SNAPSHOT_HOST") or None, + snapshot_batch_size=int(os.getenv("MYSQL_SNAPSHOT_BATCH_SIZE", 1000)), + force_snapshot=os.getenv("MYSQL_FORCE_SNAPSHOT", "false").lower() == "true", +) +app.add_source(mysql_source, output_topic) + + +def main(): + app.run() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/sources/mysql_cdc/requirements.txt b/python/sources/mysql_cdc/requirements.txt new file mode 100644 index 00000000..6d0b1dfc --- /dev/null +++ b/python/sources/mysql_cdc/requirements.txt @@ -0,0 +1,3 @@ +# TODO: remove "@ git+" version of install before merging PR +quixstreams[mysql] @ git+https://github.com/quixio/quix-streams.git@source/mysql-cdc +python-dotenv