diff --git a/python/destinations/quixlake-timeseries/README.md b/python/destinations/quixlake-timeseries/README.md new file mode 100644 index 00000000..b657d7b6 --- /dev/null +++ b/python/destinations/quixlake-timeseries/README.md @@ -0,0 +1,197 @@ +# Quix TS Datalake Sink + +This connector consumes time-series data from a Kafka topic and writes it to S3 as Hive-partitioned Parquet files, with optional Quix catalog registration for data lake query API. + +## Features + +- **Hive Partitioning**: Automatically partition data by any columns (e.g., location, sensor type, year/month/day/hour) +- **Time-based Partitioning**: Extract year/month/day/hour from timestamp columns for efficient time-based queries +- **Quix Catalog Integration**: Optional table registration in a REST Catalog for seamless integration with analytics tools +- **Efficient Batching**: Configurable batch sizes and parallel S3 uploads for high throughput +- **Schema Evolution**: Automatic schema detection from data +- **Partition Validation**: Prevents data corruption by validating partition strategies against existing tables + +## How to run + +Create a [Quix](https://portal.cloud.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* Click `Test connection & deploy` to deploy the pre-built and configured container into Quix +* Or click `Customise connector` to inspect or alter the code before deployment + +## Environment Variables + +### Required + +- **`input`**: Name of the Kafka input topic to consume from + *Default*: `sensor-data` + +- **`S3_BUCKET`**: S3 bucket name for storing Parquet files + +### S3 Configuration + +- **`S3_PREFIX`**: S3 prefix/path for data files + *Default*: `data` + +- **`AWS_ACCESS_KEY_ID`**: AWS Access Key ID for S3 access + *Default*: `""` (uses IAM role if empty) + +- **`AWS_SECRET_ACCESS_KEY`**: AWS Secret Access Key for S3 access + *Default*: `""` (uses IAM role if empty) + +- **`AWS_REGION`**: AWS region for S3 bucket + *Default*: `us-east-1` + +- **`AWS_ENDPOINT_URL`**: Custom S3 endpoint URL for non-AWS S3-compatible storage + *Examples*: + - MinIO: `http://minio.example.com:9000` + - Wasabi: `https://s3.wasabisys.com` + - DigitalOcean Spaces: `https://nyc3.digitaloceanspaces.com` + - Backblaze B2: `https://s3.us-west-004.backblazeb2.com` + *Default*: None (uses AWS S3) + +### Data Organization + +- **`TABLE_NAME`**: Table name for data organization and registration + *Default*: Uses the topic name if not specified + +- **`HIVE_COLUMNS`**: Comma-separated list of columns for Hive partitioning. Include `year`, `month`, `day`, `hour` to extract from `TIMESTAMP_COLUMN` + *Example*: `location,year,month,day,sensor_type` + *Default*: `""` (no partitioning) + +- **`TIMESTAMP_COLUMN`**: Column containing timestamp values to extract year/month/day/hour from + *Default*: `ts_ms` + +### Catalog Integration (Optional) + +- **`CATALOG_URL`**: REST Catalog URL for optional table registration (leave empty to skip) + *Example*: `https://catalog.example.com/api/v1` + +- **`CATALOG_AUTH_TOKEN`**: If using a catalog, the respective auth token to access it + +- **`AUTO_DISCOVER`**: Automatically register table in REST Catalog on first write + *Default*: `true` + +- **`CATALOG_NAMESPACE`**: Catalog namespace for table registration + *Default*: `default` + +### Kafka Configuration + +- **`CONSUMER_GROUP`**: Kafka consumer group name + *Default*: `s3_direct_sink_v1.0` + +- **`AUTO_OFFSET_RESET`**: Where to start consuming if no offset exists + *Default*: `latest` + *Options*: `earliest`, `latest` + +- **`KAFKA_KEY_DESERIALIZER`**: The key deserializer to use + *Default*: `str` + +- **`KAFKA_VALUE_DESERIALIZER`**: The value deserializer to use + *Default*: `json` + +### Performance Tuning + +- **`BATCH_SIZE`**: Number of messages to batch before writing to S3 + *Default*: `1000` + +- **`COMMIT_INTERVAL`**: Kafka commit interval in seconds + *Default*: `30` + +- **`MAX_WRITE_WORKERS`**: How many files can be written in parallel to S3 at once + *Default*: `10` + +### Application Settings + +- **`LOGLEVEL`**: Set application logging level + *Default*: `INFO` + *Options*: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` + +## Partitioning Strategy Examples + +### Example 1: Time-based partitioning +```bash +HIVE_COLUMNS=year,month,day +TIMESTAMP_COLUMN=ts_ms +``` +Creates: `s3://bucket/prefix/table/year=2024/month=01/day=15/data_*.parquet` + +### Example 2: Multi-dimensional partitioning +```bash +HIVE_COLUMNS=location,sensor_type,year,month +TIMESTAMP_COLUMN=timestamp +``` +Creates: `s3://bucket/prefix/table/location=NYC/sensor_type=temp/year=2024/month=01/data_*.parquet` + +### Example 3: No partitioning +```bash +HIVE_COLUMNS= +``` +Creates: `s3://bucket/prefix/table/data_*.parquet` + +## Using Non-AWS S3-Compatible Storage + +This connector supports any S3-compatible storage service by setting the `AWS_ENDPOINT_URL` environment variable. + +### MinIO Example +```bash +AWS_ENDPOINT_URL=http://minio.example.com:9000 +AWS_ACCESS_KEY_ID=minioadmin +AWS_SECRET_ACCESS_KEY=minioadmin +AWS_REGION=us-east-1 +S3_BUCKET=my-data-lake +``` + +### Wasabi Example +```bash +AWS_ENDPOINT_URL=https://s3.wasabisys.com +AWS_ACCESS_KEY_ID=your-wasabi-access-key +AWS_SECRET_ACCESS_KEY=your-wasabi-secret-key +AWS_REGION=us-east-1 +S3_BUCKET=my-data-lake +``` + +### DigitalOcean Spaces Example +```bash +AWS_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com +AWS_ACCESS_KEY_ID=your-spaces-access-key +AWS_SECRET_ACCESS_KEY=your-spaces-secret-key +AWS_REGION=nyc3 +S3_BUCKET=my-data-lake +``` + +### Backblaze B2 Example +```bash +AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com +AWS_ACCESS_KEY_ID=your-b2-key-id +AWS_SECRET_ACCESS_KEY=your-b2-application-key +AWS_REGION=us-west-004 +S3_BUCKET=my-data-lake +``` + +## Architecture + +The sink uses a batching architecture for high throughput: + +1. **Consume**: Messages are consumed from Kafka in batches +2. **Transform**: Time-based columns are extracted if needed +3. **Partition**: Data is grouped by partition columns +4. **Upload**: Multiple files are uploaded to S3 in parallel +5. **Register**: Files are registered in the catalog (if configured) + +## Requirements + +- S3 bucket access: + - AWS S3, or + - Any S3-compatible storage (MinIO, Wasabi, DigitalOcean Spaces, Backblaze B2, etc.) +- Optional: Quix REST Catalog endpoint for data catalog integration + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/quixlake-timeseries/catalog_client.py b/python/destinations/quixlake-timeseries/catalog_client.py new file mode 100644 index 00000000..594aae62 --- /dev/null +++ b/python/destinations/quixlake-timeseries/catalog_client.py @@ -0,0 +1,70 @@ +import requests +from typing import Optional + + +class CatalogClient: + """Simple HTTP client for REST Catalog API interactions""" + + def __init__(self, base_url: str, auth_token: Optional[str] = None): + """ + Initialize the catalog client + + Args: + base_url: Base URL of the REST Catalog API + auth_token: Optional authentication token for API requests + """ + self.base_url = base_url.rstrip('/') + self.auth_token = auth_token + self._session = requests.Session() + + # Set up authentication header if token is provided + if self.auth_token: + self._session.headers['Authorization'] = f'Bearer {self.auth_token}' + + def get(self, path: str, timeout: int = 30) -> requests.Response: + """ + Make a GET request to the catalog API + + Args: + path: API endpoint path (will be appended to base_url) + timeout: Request timeout in seconds + + Returns: + Response object from requests library + """ + url = f"{self.base_url}{path}" + return self._session.get(url, timeout=timeout) + + def post(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response: + """ + Make a POST request to the catalog API + + Args: + path: API endpoint path (will be appended to base_url) + json: JSON payload to send in request body + timeout: Request timeout in seconds + + Returns: + Response object from requests library + """ + url = f"{self.base_url}{path}" + return self._session.post(url, json=json, timeout=timeout) + + def put(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response: + """ + Make a PUT request to the catalog API + + Args: + path: API endpoint path (will be appended to base_url) + json: JSON payload to send in request body + timeout: Request timeout in seconds + + Returns: + Response object from requests library + """ + url = f"{self.base_url}{path}" + return self._session.put(url, json=json, timeout=timeout) + + def __str__(self): + """String representation showing the base URL""" + return self.base_url diff --git a/python/destinations/quixlake-timeseries/dockerfile b/python/destinations/quixlake-timeseries/dockerfile new file mode 100644 index 00000000..d3cd4959 --- /dev/null +++ b/python/destinations/quixlake-timeseries/dockerfile @@ -0,0 +1,23 @@ +FROM python:3.11.1-slim-buster + +# Set environment variables to non-interactive and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 + +# Set the working directory inside the container +WORKDIR /app + +# Copy only the requirements file(s) to leverage Docker cache +# Assuming all requirements files are in the root or subdirectories +COPY ./requirements.txt ./ + +# Install dependencies +# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application +COPY . . + +# Set the command to run your application +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/destinations/quixlake-timeseries/icon.png b/python/destinations/quixlake-timeseries/icon.png new file mode 100644 index 00000000..2940139d Binary files /dev/null and b/python/destinations/quixlake-timeseries/icon.png differ diff --git a/python/destinations/quixlake-timeseries/library.json b/python/destinations/quixlake-timeseries/library.json new file mode 100644 index 00000000..c2910840 --- /dev/null +++ b/python/destinations/quixlake-timeseries/library.json @@ -0,0 +1,194 @@ +{ + "libraryItemId": "quixlake-timeseries-destination", + "name": "Quix DataLake Timeseries Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["File Store"] + }, + "shortDescription": "Consume data from a Kafka topic and write it to an AWS S3 bucket path.", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Name of the Kafka input topic to consume from", + "DefaultValue": "sensor-data", + "Required": true + }, + { + "Name": "S3_BUCKET", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "S3 bucket name for storing Parquet files", + "Required": true + }, + { + "Name": "S3_PREFIX", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "S3 prefix/path for data files", + "DefaultValue": "data", + "Required": false + }, + { + "Name": "TABLE_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Table name for data organization and registration, else defaults to topic name", + "Required": false + }, + { + "Name": "HIVE_COLUMNS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Comma-separated list of columns for Hive partitioning. Include year/month/day/hour to extract from TIMESTAMP_COLUMN (e.g., location,year,month,day,sensor_type)", + "DefaultValue": "", + "Required": false + }, + { + "Name": "TIMESTAMP_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Column containing timestamp values to extract year/month/day/hour from", + "DefaultValue": "ts_ms", + "Required": false + }, + { + "Name": "CATALOG_URL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "REST Catalog URL for optional table registration (leave empty to skip)", + "Required": false + }, + { + "Name": "CATALOG_AUTH_TOKEN", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "If using a catalog, the respective auth token to access it", + "Required": false + }, + { + "Name": "AUTO_DISCOVER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Automatically register table in REST Catalog on first write", + "DefaultValue": "true", + "Required": false + }, + { + "Name": "CATALOG_NAMESPACE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Catalog namespace for table registration", + "DefaultValue": "default", + "Required": false + }, + { + "Name": "BATCH_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Number of messages to batch before writing to S3", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "COMMIT_INTERVAL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Kafka commit interval in seconds", + "DefaultValue": "30", + "Required": false + }, + { + "Name": "KAFKA_KEY_DESERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The key deserializer to use", + "DefaultValue": "str", + "Required": false + }, + { + "Name": "KAFKA_VALUE_DESERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "the value deserializer to use", + "DefaultValue": "json", + "Required": false + }, + { + "Name": "CONSUMER_GROUP", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Kafka consumer group name", + "DefaultValue": "s3_direct_sink_v1.0", + "Required": false + }, + { + "Name": "AUTO_OFFSET_RESET", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Where to start consuming if no offset exists", + "DefaultValue": "latest", + "Required": false + }, + { + "Name": "AWS_ACCESS_KEY_ID", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "AWS Access Key ID for S3 access", + "DefaultValue": "", + "Required": false + }, + { + "Name": "AWS_SECRET_ACCESS_KEY", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "AWS Secret Access Key for S3 access", + "Required": false + }, + { + "Name": "AWS_REGION", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "AWS region for S3 bucket", + "DefaultValue": "us-east-1", + "Required": false + }, + { + "Name": "AWS_ENDPOINT_URL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "S3 endpoint url (for non-AWS endpoints)", + "Required": false + }, + { + "Name": "LOGLEVEL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "set application logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", + "DefaultValue": "INFO" + }, + { + "Name": "MAX_WRITE_WORKERS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "How many files can be written in parallel to S3 at once (based on partitioning + batch size)", + "DefaultValue": "10", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 500, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": false + } +} \ No newline at end of file diff --git a/python/destinations/quixlake-timeseries/main.py b/python/destinations/quixlake-timeseries/main.py new file mode 100644 index 00000000..57faac8e --- /dev/null +++ b/python/destinations/quixlake-timeseries/main.py @@ -0,0 +1,79 @@ +""" +Quix TS Datalake Sink - Main Entry Point + +This application consumes data from a Kafka topic and writes it to S3 as +Hive-partitioned Parquet files with optional Iceberg catalog registration. +""" +import os +import logging + +from quixstreams import Application +from quixlake_sink import QuixLakeSink + +# Configure logging +logging.basicConfig( + level=os.getenv("LOGLEVEL", "INFO"), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def parse_hive_columns(columns_str: str) -> list: + """ + Parse comma-separated list of partition columns. + + Args: + columns_str: Comma-separated column names (e.g., "year,month,day") + + Returns: + List of column names, or empty list if input is empty + """ + if not columns_str or columns_str.strip() == "": + return [] + return [col.strip() for col in columns_str.split(",") if col.strip()] + + +# Initialize Quix Streams Application +app = Application( + consumer_group=os.getenv("CONSUMER_GROUP", "s3_direct_sink_v1.0"), + auto_offset_reset=os.getenv("AUTO_OFFSET_RESET", "latest"), + commit_interval=int(os.getenv("COMMIT_INTERVAL", "30")) +) + +# Parse configuration +hive_columns = parse_hive_columns(os.getenv("HIVE_COLUMNS", "")) +auto_discover = os.getenv("AUTO_DISCOVER", "true").lower() == "true" +table_name = os.getenv("TABLE_NAME") or os.environ["input"] + +# Initialize QuixLakeSink +s3_sink = QuixLakeSink( + s3_bucket=os.environ["S3_BUCKET"], + s3_prefix=os.getenv("S3_PREFIX", "data"), + table_name=table_name, + aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + aws_region=os.getenv("AWS_REGION", "us-east-1"), + s3_endpoint_url=os.getenv("AWS_ENDPOINT_URL"), + hive_columns=hive_columns, + timestamp_column=os.getenv("TIMESTAMP_COLUMN", "ts_ms"), + catalog_url=os.getenv("CATALOG_URL"), + catalog_auth_token=os.getenv("CATALOG_AUTH_TOKEN"), + auto_discover=auto_discover, + namespace=os.getenv("CATALOG_NAMESPACE", "default"), + auto_create_bucket=True, + max_workers=int(os.getenv("MAX_WRITE_WORKERS", "10")) +) + +# Create streaming dataframe and attach sink +sdf = app.dataframe(topic=app.topic(os.environ["input"])) + +# Attach sink (batching is handled by BatchingSink) +sdf.sink(s3_sink) + +logger.info("Starting Quix TS Datalake Sink") +logger.info(f" Input topic: {os.environ['input']}") +logger.info(f" S3 destination: s3://{os.environ['S3_BUCKET']}/{os.getenv('S3_PREFIX', 'data')}/{table_name}") +logger.info(f" Partitioning: {hive_columns if hive_columns else 'none'}") + +if __name__ == "__main__": + app.run() diff --git a/python/destinations/quixlake-timeseries/quixlake_sink.py b/python/destinations/quixlake-timeseries/quixlake_sink.py new file mode 100644 index 00000000..e5abf454 --- /dev/null +++ b/python/destinations/quixlake-timeseries/quixlake_sink.py @@ -0,0 +1,568 @@ +from quixstreams.sinks import BatchingSink, SinkBatch +import boto3 +from botocore.exceptions import ClientError +from s3transfer.manager import TransferManager, TransferConfig +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import time +import logging +import uuid +from typing import List, Dict, Any, Optional +from datetime import datetime, timezone +from io import BytesIO +from catalog_client import CatalogClient + + +TIMESTAMP_COL_MAPPER = { + "year": lambda col: col.dt.year.astype(str), + "month": lambda col: col.dt.month.astype(str).str.zfill(2), + "day": lambda col: col.dt.day.astype(str).str.zfill(2), + "hour": lambda col: col.dt.hour.astype(str).str.zfill(2) +} + +logger = logging.getLogger('quixstreams') + + +class QuixLakeSink(BatchingSink): + """ + Writes Kafka batches directly to S3 as Hive-partitioned Parquet files, + then optionally registers the table using the discover endpoint. + """ + + def __init__( + self, + s3_bucket: str, + s3_prefix: str, + table_name: str, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + aws_region: str = "us-east-1", + s3_endpoint_url: Optional[str] = None, + hive_columns: List[str] = None, + timestamp_column: str = "ts_ms", + catalog_url: Optional[str] = None, + catalog_auth_token: Optional[str] = None, + auto_discover: bool = True, + namespace: str = "default", + auto_create_bucket: bool = True, + max_workers: int = 10 + ): + """ + Initialize S3 Direct Sink + + Args: + s3_bucket: S3 bucket name + s3_prefix: S3 prefix/path for data files + table_name: Table name for registration + aws_access_key_id: AWS access key ID + aws_secret_access_key: AWS secret access key + aws_region: AWS region (default: "us-east-1") + s3_endpoint_url: Custom S3 endpoint URL for non-AWS S3-compatible storage + (e.g., MinIO, Wasabi, DigitalOcean Spaces) + hive_columns: List of columns to use for Hive partitioning. Include 'year', 'month', + 'day', 'hour' to extract these from timestamp_column + timestamp_column: Column containing timestamp to extract time partitions from + catalog_url: Optional REST Catalog URL for table registration + catalog_auth_token: If using REST Catalog, the respective auth token for it + auto_discover: Whether to auto-register table on first write + namespace: Catalog namespace (default: "default") + auto_create_bucket: if True, create bucket in S3 if missing. + max_workers: Maximum number of parallel upload threads (default: 10) + """ + self._aws_region = aws_region + self._aws_access_key_id = aws_access_key_id + self._aws_secret_access_key = aws_secret_access_key + self._aws_endpoint_url = s3_endpoint_url + self._credentials = { + "region_name": self._aws_region, + "aws_access_key_id": self._aws_access_key_id, + "aws_secret_access_key": self._aws_secret_access_key, + "endpoint_url": self._aws_endpoint_url, + } + self.s3_bucket = s3_bucket + self.s3_prefix = s3_prefix + self.table_name = table_name + self.hive_columns = hive_columns or [] + self.timestamp_column = timestamp_column + self._catalog = CatalogClient(catalog_url, catalog_auth_token) if catalog_url else None + self.auto_discover = auto_discover + self.namespace = namespace + self.table_registered = False + + # S3 client will be initialized in setup() + self.s3_client = None + self._ts_hive_columns = {'year', 'month', 'day', 'hour'} & set(self.hive_columns) + self._auto_create_bucket = auto_create_bucket + self._max_workers = max_workers + + # Batch upload tracking with TransferManager + self._pending_futures = [] + self._transfer_manager = None + + super().__init__() + + def setup(self): + """Initialize S3 client and test connection""" + logger.info("Starting S3 Direct Sink...") + logger.info(f"S3 Target: s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}") + logger.info(f"Partitioning: hive_columns={self.hive_columns}") + + if self._aws_endpoint_url: + logger.info(f"Using custom S3 endpoint: {self._aws_endpoint_url}") + + if self._catalog and self.auto_discover: + logger.info(f"Table will be auto-registered in REST Catalog on first write") + + try: + # Initialize S3 client + self.s3_client = boto3.client( + 's3', + **self._credentials + ) + + # Initialize TransferManager for concurrent uploads + transfer_config = TransferConfig(max_request_concurrency=self._max_workers) + self._transfer_manager = TransferManager(self.s3_client, config=transfer_config) + + # Confirm bucket connection + self._ensure_bucket() + + # Test Catalog connection if configured + if self._catalog: + try: + response = self._catalog.get("/health", timeout=5) + response.raise_for_status() + logger.info("Successfully connected to REST Catalog at %s", self._catalog) + except Exception as e: + logger.warning("Could not connect to REST Catalog: %s. Table registration disabled.", e) + self.auto_discover = False + + # Check if table already exists in S3 and validate partition strategy + self._validate_existing_table_structure() + + except Exception as e: + logger.error("Failed to setup S3 connection: %s", e) + raise + + def _ensure_bucket(self): + bucket = self.s3_bucket + try: + self.s3_client.head_bucket(Bucket=bucket) + except ClientError as e: + error_code = int(e.response["Error"]["Code"]) + if error_code == 404 and self._auto_create_bucket: + # Bucket does not exist, create it + logger.debug(f"⚠️ Bucket '{bucket}' not found. Creating it...") + self.s3_client.create_bucket(Bucket=self.s3_bucket) + logger.info(f"✅ Bucket '{bucket}' created.") + else: + raise + logger.info("Successfully connected to S3 bucket: %s", bucket) + + def write(self, batch: SinkBatch): + """Write batch directly to S3""" + # Register table before first write if auto-discover is enabled + if self.auto_discover and not self.table_registered and self._catalog: + self._register_table() + + attempts = 3 + while attempts: + start = time.perf_counter() + try: + self._write_batch(batch) + elapsed_ms = (time.perf_counter() - start) * 1000 + logger.info("✔ wrote %d rows to S3 in %.1f ms", batch.size, elapsed_ms) + return + except Exception as exc: + attempts -= 1 + if attempts == 0: + raise + logger.warning("Write failed (%s) – retrying …", exc) + time.sleep(3) + + def _write_batch(self, batch: SinkBatch): + """Convert batch to Parquet and write to S3 with Hive partitioning""" + if not batch: + return + + # Convert batch to list of dictionaries + rows = [] + for item in batch: + row = item.value.copy() + # Add timestamp and key if not present + # This ensures we have a timestamp column for time-based partitioning + if self.timestamp_column not in row: + row[self.timestamp_column] = item.timestamp + row["__key"] = item.key + rows.append(row) + + # Convert to DataFrame for easier manipulation + df = pd.DataFrame(rows) + + # Add time-based partition columns (year/month/day/hour) if they're specified in hive_columns + # These are extracted from the timestamp_column + if self._ts_hive_columns: + df = self._add_timestamp_columns(df) + + # Use only the explicitly specified partition columns + if partition_columns := self.hive_columns.copy(): + # Group by partition columns and write each partition separately + # This creates the Hive-style directory structure: col1=val1/col2=val2/file.parquet + for group_values, group_df in df.groupby(partition_columns): + # Ensure group_values is always a tuple for consistent handling + if not isinstance(group_values, tuple): + group_values = (group_values,) + + # Build S3 key with Hive partitioning (col=value format) + # Example: s3://bucket/prefix/table/year=2024/month=01/day=15/data_abc123.parquet + partition_parts = [f"{col}={val}" for col, val in zip(partition_columns, group_values)] + s3_key = f"{self.s3_prefix}/{self.table_name}/" + "/".join(partition_parts) + f"/data_{uuid.uuid4().hex}.parquet" + + # Remove partition columns from data (Hive style - partition values are in the path, not the data) + data_df = group_df.drop(columns=partition_columns, errors='ignore') + + # Write to S3 + self._write_parquet_to_s3(data_df, s3_key, partition_columns, group_values) + else: + # No partitioning - write as single file directly under table directory + s3_key = f"{self.s3_prefix}/{self.table_name}/data_{uuid.uuid4().hex}.parquet" + self._write_parquet_to_s3(df, s3_key, [], ()) + + # Wait for all uploads to complete and register files in catalog + self._finalize_writes() + + def _write_parquet_to_s3( + self, + df: pd.DataFrame, + s3_key: str, + partition_columns: List[str], + partition_values: tuple + ): + # Convert to Arrow table and prepare buffer + self._null_empty_dicts(df) + table = pa.Table.from_pandas(df) + + buf = pa.BufferOutputStream() + pq.write_table(table, buf) + parquet_bytes = buf.getvalue().to_pybytes() + + # Submit upload to TransferManager + future = self._transfer_manager.upload( + BytesIO(parquet_bytes), + self.s3_bucket, + s3_key + ) + + self._pending_futures.append({ + 'future': future, + 'key': s3_key, + 'row_count': len(df), + 'file_size': len(parquet_bytes), + 'partition_columns': partition_columns, + 'partition_values': partition_values + }) + + def _finalize_writes(self): + """Wait for all pending uploads to complete and register files in catalog""" + if not self._pending_futures: + return + + count = len(self._pending_futures) + logger.debug(f"Waiting for {count} upload(s) to complete...") + + # Wait for all uploads to complete + for item in self._pending_futures: + try: + item['future'].result() # Wait and raise on error + logger.debug("✓ Uploaded %d rows to s3://%s/%s", + item['row_count'], self.s3_bucket, item['key']) + except Exception as e: + logger.error("✗ Failed to upload s3://%s/%s: %s", + self.s3_bucket, item['key'], e) + raise + + logger.info(f"✓ Successfully uploaded {count} file(s)") + + # Register all files in catalog manifest if configured + if self._catalog and self.table_registered: + self._register_files_in_manifest() + + # Clear the futures list + self._pending_futures.clear() + + def _null_empty_dicts(self, df: pd.DataFrame): + """ + Convert empty dictionaries to null values before writing to Parquet. + + Parquet format has limitations with empty maps/structs - they cannot be written + properly and will cause serialization errors. This method scans all columns + that contain dictionaries and replaces empty dicts ({}) with None/null values. + + This is done in-place to avoid copying the DataFrame. + """ + for col in df.columns: + # Check if column contains any dictionary values + if df[col].apply(lambda x: isinstance(x, dict)).any(): + # Replace empty dicts with None; keeps non-empty dicts as-is + df[col] = df[col].apply(lambda x: x or None) + + def _register_table(self): + """Register the table in REST Catalog""" + if not self._catalog: + return + + try: + # First check if table already exists + check_response = self._catalog.get( + f"/namespaces/{self.namespace}/tables/{self.table_name}", + timeout=5 + ) + + if check_response.status_code == 200: + logger.info("Table '%s' already exists in catalog", self.table_name) + self.table_registered = True + # Validate partition strategy matches + self._validate_partition_strategy(check_response.json()) + return + + # Table doesn't exist, create it + s3_path = f"s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}" + + # Define partition spec based on configuration + # For dynamic partition discovery, create table without partition spec + # The partition spec will be set when first files are added + partition_spec = [] # Empty spec for dynamic discovery + + # Create table with minimal schema (will be inferred from data) + create_response = self._catalog.put( + f"/namespaces/{self.namespace}/tables/{self.table_name}", + json={ + "location": s3_path, + "partition_spec": partition_spec, # Empty for dynamic discovery + "properties": { + "created_by": "quix-lake-sink", + "auto_discovered": "false", + "expected_partitions": self.hive_columns.copy() # Store expected partitions in properties + } + }, + timeout=30 + ) + + if create_response.status_code in [200, 201]: + logger.info( + "Successfully created table '%s' in REST Catalog. Partitions will be set dynamically to: %s", + self.table_name, + self.hive_columns + ) + self.table_registered = True + else: + logger.warning( + "Failed to create table '%s': %s", + self.table_name, + create_response.text + ) + + except Exception as e: + logger.warning("Failed to register table '%s': %s", self.table_name, e) + + def _add_timestamp_columns(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Add timestamp-based columns (year/month/day/hour) for time-based partitioning. + + This method extracts time components from the timestamp column and adds them + as separate columns that can be used for Hive partitioning. + """ + # Convert to datetime if needed (handles numeric timestamps) + if not pd.api.types.is_datetime64_any_dtype(df[self.timestamp_column]): + sample_value = float(df[self.timestamp_column].iloc[0] if not df[self.timestamp_column].empty else 0) + + # Auto-detect timestamp unit by inspecting the magnitude of the value + # Typical timestamp ranges: + # - Seconds: ~1.7e9 (since epoch 1970) + # - Milliseconds: ~1.7e12 + # - Microseconds: ~1.7e15 + # - Nanoseconds: ~1.7e18 + if sample_value > 1e17: + # Nanoseconds (Java/Kafka timestamps) + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='ns') + elif sample_value > 1e14: + # Microseconds + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='us') + elif sample_value > 1e11: + # Milliseconds (common in JavaScript/Kafka) + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='ms') + else: + # Seconds (Unix timestamp) + df[self.timestamp_column] = pd.to_datetime(df[self.timestamp_column], unit='s') + + # Extract time-based columns (year, month, day, hour) from the timestamp + timestamp_col = df[self.timestamp_column] + + # Only add columns that are specified in _ts_hive_columns + # TIMESTAMP_COL_MAPPER handles proper formatting (e.g., zero-padding for months/days) + for col in self._ts_hive_columns: + df[col] = TIMESTAMP_COL_MAPPER[col](timestamp_col) + + return df + + def _validate_partition_strategy(self, table_metadata: Dict[str, Any]): + """Validate that the sink's partition strategy matches the existing table""" + existing_partition_spec = table_metadata.get("partition_spec", []) + + # Build expected partition spec from sink configuration + expected_partition_spec = self.hive_columns.copy() + + # Special case: If table has no partition spec yet (empty list), + # it will be set when first files are added + if not existing_partition_spec: + logger.info( + "Table '%s' has no partition spec yet. Will be set to %s on first write.", + self.table_name, + expected_partition_spec + ) + return + + # Check if partition strategies match + if set(existing_partition_spec) != set(expected_partition_spec): + error_msg = ( + f"Partition strategy mismatch for table '{self.table_name}'. " + f"Existing table has partitions: {existing_partition_spec}, " + f"but sink is configured with: {expected_partition_spec}. " + "This would corrupt the folder structure. Please ensure the sink partition " + "configuration matches the existing table." + ) + logger.error(error_msg) + raise ValueError(error_msg) + + # Also check the order of partitions + if existing_partition_spec != expected_partition_spec: + warning_msg = ( + f"Partition column order differs for table '{self.table_name}'. " + f"Existing: {existing_partition_spec}, Configured: {expected_partition_spec}. " + "While this won't corrupt data, it may lead to suboptimal query performance." + ) + logger.warning(warning_msg) + + def _validate_existing_table_structure(self): + """ + Check if table already exists in S3 and validate partition structure. + + This prevents data corruption by ensuring that if a table already exists in S3, + the sink's partition configuration matches what's already on disk. Mismatched + partition strategies would result in a corrupted folder structure that would + make the data unqueryable. + """ + table_prefix = f"{self.s3_prefix}/{self.table_name}/" + + try: + # List objects to see if table exists (sample first 100 files) + response = self.s3_client.list_objects_v2( + Bucket=self.s3_bucket, + Prefix=table_prefix, + MaxKeys=100 + ) + + if 'Contents' not in response: + # Table doesn't exist yet, no validation needed + return + + # Detect existing partition columns from S3 directory structure + # We parse the S3 paths to extract partition columns from Hive-style paths + detected_partition_columns = [] + for obj in response['Contents']: + if obj['Key'].endswith('.parquet'): + # Extract path after table prefix + # Example: "year=2024/month=01/day=15/data.parquet" -> ["year=2024", "month=01", "day=15", "data.parquet"] + relative_path = obj['Key'][len(table_prefix):] + path_parts = relative_path.split('/') + + # Look for Hive-style partitions (col=value format) + for part in path_parts[:-1]: # Exclude filename + if '=' in part: + # Extract column name from "col=value" + col_name = part.split('=')[0] + # Maintain order of first appearance + if col_name not in detected_partition_columns: + detected_partition_columns.append(col_name) + + if detected_partition_columns: + # Build expected partition spec from sink configuration + expected_partition_spec = self.hive_columns.copy() + + # Check if partition strategies match + # Using set comparison to ignore order first + if set(detected_partition_columns) != set(expected_partition_spec): + error_msg = ( + f"Partition strategy mismatch for table '{self.table_name}'. " + f"Existing table in S3 has partitions: {detected_partition_columns}, " + f"but sink is configured with: {expected_partition_spec}. " + "This would corrupt the folder structure. Please ensure the sink partition " + "configuration matches the existing table." + ) + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info( + "Validated partition strategy for existing table '%s'. Partitions: %s", + self.table_name, + detected_partition_columns + ) + + except self.s3_client.exceptions.NoSuchBucket: + raise + except ValueError: + raise + except Exception as e: + logger.warning( + "Could not validate existing table structure: %s. Proceeding with caution.", e + ) + + def _register_files_in_manifest(self): + """Register multiple newly written files in the catalog manifest""" + if not (file_items := self._pending_futures): + return + + try: + # Build file entries for all files + file_entries = [] + for item in file_items: + s3_key = item['key'] + row_count = item['row_count'] + file_size = item['file_size'] + partition_columns = item['partition_columns'] + partition_values = item['partition_values'] + + # Build S3 URL + file_path = f"s3://{self.s3_bucket}/{s3_key}" + + # Build partition values dict + partition_dict = {} + if partition_columns and partition_values: + for col, val in zip(partition_columns, partition_values): + partition_dict[col] = str(val) + + # Create file entry + file_entries.append({ + "file_path": file_path, + "file_size": file_size, + "last_modified": datetime.now(tz=timezone.utc).isoformat(), + "partition_values": partition_dict, + "row_count": row_count + }) + + # Send all files to catalog in a single request + response = self._catalog.post( + f"/namespaces/{self.namespace}/tables/{self.table_name}/manifest/add-files", + json={"files": file_entries}, + timeout=10 + ) + + if response.status_code == 200: + logger.info(f"✓ Registered {len(file_entries)} file(s) in catalog manifest") + else: + logger.warning("Failed to register files in manifest: %s", response.text) + + except Exception as e: + # Don't fail the write if manifest registration fails + logger.warning("Failed to register files in manifest: %s", e) \ No newline at end of file diff --git a/python/destinations/quixlake-timeseries/requirements.txt b/python/destinations/quixlake-timeseries/requirements.txt new file mode 100644 index 00000000..170a474f --- /dev/null +++ b/python/destinations/quixlake-timeseries/requirements.txt @@ -0,0 +1,6 @@ +quixstreams==3.23.1 +python-dotenv +requests +pandas +boto3 +pyarrow \ No newline at end of file diff --git a/python/destinations/s3-file/requirements.txt b/python/destinations/s3-file/requirements.txt index 5f3f3920..db0a482d 100644 --- a/python/destinations/s3-file/requirements.txt +++ b/python/destinations/s3-file/requirements.txt @@ -1,2 +1,7 @@ -quixstreams[s3]==3.23.1 +quixstreams==3.23.1 python-dotenv +requests +pandas +boto3 +pyarrow +mypy-boto3-s3 \ No newline at end of file diff --git a/tests/destinations/quixlake-timeseries/docker-compose.test.yml b/tests/destinations/quixlake-timeseries/docker-compose.test.yml new file mode 100644 index 00000000..07f381f7 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/docker-compose.test.yml @@ -0,0 +1,196 @@ +# Test configuration for Quix Lake Timeseries destination +# Uses only public Docker images + mock catalog service +# timeout: 120 + +services: + # MinIO for S3-compatible storage + minio: + image: minio/minio:latest + command: server /data --console-address ":9001" + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + networks: + - test-network + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 3s + timeout: 5s + retries: 10 + stop_grace_period: 3s + + # Initialize MinIO with test bucket + minio-init: + image: minio/mc:latest + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set myminio http://minio:9000 minioadmin minioadmin; + mc mb myminio/test-bucket --ignore-existing; + mc rm --recursive --force myminio/test-bucket/data/ || true; + echo 'MinIO bucket created and cleaned'; + echo 'Keeping minio-init alive...'; + tail -f /dev/null + " + networks: + - test-network + + # Kafka broker (using Redpanda) + kafka: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092 + - --advertise-kafka-addr internal://kafka:9092 + - --mode dev-container + - --smp 1 + healthcheck: + test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] + interval: 5s + timeout: 10s + retries: 10 + networks: + - test-network + stop_grace_period: 3s + + # Mock REST Catalog service for testing + mock-catalog: + build: + context: ./mock-catalog + dockerfile: Dockerfile + networks: + - test-network + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:5001/health')"] + interval: 3s + timeout: 5s + retries: 10 + stop_grace_period: 3s + + # Quix Lake Timeseries destination service + quixlake-timeseries: + build: + context: ../../../python/destinations/quixlake-timeseries + dockerfile: dockerfile + environment: + # Quix/Kafka configuration + - Quix__Broker__Address=kafka:9092 + - Quix__Consumer__Group=quixlake-ts-test + - Quix__Deployment__Id=test-quixlake-ts + + # Input topic + - input=test-quixlake-input + + # S3 configuration + - S3_BUCKET=test-bucket + - S3_PREFIX=data + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - AWS_REGION=us-east-1 + - AWS_ENDPOINT_URL=http://minio:9000 + + # Table configuration + - TABLE_NAME=test-quixlake-input + - HIVE_COLUMNS=location,year,month,day + - TIMESTAMP_COLUMN=ts_ms + + # Catalog configuration (using mock catalog) + - CATALOG_URL=http://mock-catalog:5001 + - CATALOG_AUTH_TOKEN= + - AUTO_DISCOVER=true + - CATALOG_NAMESPACE=default + + # Performance settings + - BATCH_SIZE=5 + - COMMIT_INTERVAL=1 + - MAX_WRITE_WORKERS=5 + + # Kafka consumer settings + - CONSUMER_GROUP=quixlake-ts-test-consumer + - AUTO_OFFSET_RESET=earliest + - KAFKA_KEY_DESERIALIZER=str + - KAFKA_VALUE_DESERIALIZER=json + + # Logging + - LOGLEVEL=DEBUG + networks: + - test-network + depends_on: + minio: + condition: service_healthy + kafka: + condition: service_healthy + minio-init: + condition: service_started + mock-catalog: + condition: service_healthy + stop_grace_period: 3s + + # Test runner - produces test data and verifies output + test-runner: + build: + context: ../../framework + dockerfile: Dockerfile + environment: + # Kafka configuration + - Quix__Broker__Address=kafka:9092 + + # Test configuration + - TEST_INPUT_TOPIC=test-quixlake-input + - TEST_MESSAGE_COUNT=6 + + # S3 configuration for verification + - AWS_ENDPOINT_URL=http://minio:9000 + - AWS_ACCESS_KEY_ID=minioadmin + - AWS_SECRET_ACCESS_KEY=minioadmin + - S3_BUCKET=test-bucket + - S3_PREFIX=data + + # Expected table configuration + - TABLE_NAME=test-quixlake-input + - HIVE_COLUMNS=location,year,month,day + + # Mock catalog URL for verification + - CATALOG_URL=http://mock-catalog:5001 + command: > + sh -c " + echo '=== Installing test dependencies ===' && + pip install boto3 pyarrow requests > /dev/null 2>&1 && + echo '✓ Dependencies installed' && + echo '' && + echo '=== Producing test messages to Kafka ===' && + python /tests/produce_test_data.py && + echo '' && + echo '=== Waiting for quixlake-timeseries to process messages ===' && + sleep 10 && + echo '' && + echo '=== Verifying Parquet data in S3 ===' && + python /tests/verify_output.py && + echo '' && + echo '=== Verifying catalog integration ===' && + python /tests/verify_catalog.py + " + volumes: + - ./produce_test_data.py:/tests/produce_test_data.py:ro + - ./verify_output.py:/tests/verify_output.py:ro + - ./verify_catalog.py:/tests/verify_catalog.py:ro + working_dir: / + networks: + - test-network + depends_on: + minio: + condition: service_healthy + kafka: + condition: service_healthy + mock-catalog: + condition: service_healthy + quixlake-timeseries: + condition: service_started + stop_grace_period: 3s + +networks: + test-network: + driver: bridge diff --git a/tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile b/tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile new file mode 100644 index 00000000..6875155f --- /dev/null +++ b/tests/destinations/quixlake-timeseries/mock-catalog/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Copy requirements and install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application +COPY app.py . + +# Expose port +EXPOSE 5001 + +# Run the application +CMD ["python", "app.py"] diff --git a/tests/destinations/quixlake-timeseries/mock-catalog/app.py b/tests/destinations/quixlake-timeseries/mock-catalog/app.py new file mode 100644 index 00000000..20012048 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/mock-catalog/app.py @@ -0,0 +1,190 @@ +""" +Mock REST Catalog Service for Testing + +This service mimics an Iceberg REST Catalog to verify that the QuixLake sink +correctly calls the catalog endpoints with proper data. + +It logs all requests and stores them for verification. +""" +from flask import Flask, request, jsonify +import json +import logging +from datetime import datetime + +app = Flask(__name__) + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# In-memory storage for tracking calls +catalog_state = { + 'tables': {}, + 'requests': [], + 'manifest_files': [] +} + + +def log_request(endpoint, method, data=None): + """Log an incoming request for verification""" + request_info = { + 'timestamp': datetime.utcnow().isoformat(), + 'endpoint': endpoint, + 'method': method, + 'data': data + } + catalog_state['requests'].append(request_info) + logger.info(f"{method} {endpoint}") + if data: + logger.info(f" Data: {json.dumps(data, indent=2)}") + + +@app.route('/health', methods=['GET']) +def health(): + """Health check endpoint""" + return jsonify({'status': 'healthy'}), 200 + + +@app.route('/namespaces//tables/', methods=['GET']) +def get_table(namespace, table): + """Get table metadata""" + log_request(f'/namespaces/{namespace}/tables/{table}', 'GET') + + table_key = f"{namespace}.{table}" + + if table_key in catalog_state['tables']: + logger.info(f" → Table exists: {table_key}") + return jsonify(catalog_state['tables'][table_key]), 200 + else: + logger.info(f" → Table not found: {table_key}") + return jsonify({'error': 'Table not found'}), 404 + + +@app.route('/namespaces//tables/
', methods=['PUT']) +def create_table(namespace, table): + """Create a new table""" + data = request.get_json() + log_request(f'/namespaces/{namespace}/tables/{table}', 'PUT', data) + + table_key = f"{namespace}.{table}" + + # Store the table metadata + catalog_state['tables'][table_key] = { + 'name': table, + 'namespace': namespace, + 'location': data.get('location'), + 'partition_spec': data.get('partition_spec', []), + 'properties': data.get('properties', {}), + 'created_at': datetime.utcnow().isoformat() + } + + logger.info(f" → Table created: {table_key}") + logger.info(f" → Location: {data.get('location')}") + logger.info(f" → Partitions: {data.get('partition_spec', [])}") + + return jsonify(catalog_state['tables'][table_key]), 201 + + +@app.route('/namespaces//tables/
/manifest/add-files', methods=['POST']) +def add_files_to_manifest(namespace, table): + """Add files to table manifest""" + data = request.get_json() + log_request(f'/namespaces/{namespace}/tables/{table}/manifest/add-files', 'POST', data) + + table_key = f"{namespace}.{table}" + files = data.get('files', []) + + # Store file registrations + for file_info in files: + file_entry = { + 'table': table_key, + 'file_path': file_info.get('file_path'), + 'file_size': file_info.get('file_size'), + 'row_count': file_info.get('row_count'), + 'partition_values': file_info.get('partition_values', {}), + 'registered_at': datetime.utcnow().isoformat() + } + catalog_state['manifest_files'].append(file_entry) + + logger.info(f" → Registered {len(files)} file(s) for table {table_key}") + for i, file_info in enumerate(files[:3]): # Log first 3 files + logger.info(f" File {i+1}: {file_info.get('file_path')}") + logger.info(f" Size: {file_info.get('file_size')} bytes") + logger.info(f" Rows: {file_info.get('row_count')}") + logger.info(f" Partitions: {file_info.get('partition_values', {})}") + + if len(files) > 3: + logger.info(f" ... and {len(files) - 3} more file(s)") + + return jsonify({'success': True, 'files_added': len(files)}), 200 + + +@app.route('/catalog/state', methods=['GET']) +def get_catalog_state(): + """Get the current catalog state (for testing/debugging)""" + return jsonify({ + 'tables': catalog_state['tables'], + 'total_requests': len(catalog_state['requests']), + 'total_files': len(catalog_state['manifest_files']), + 'recent_requests': catalog_state['requests'][-10:] # Last 10 requests + }), 200 + + +@app.route('/catalog/verify', methods=['GET']) +def verify_catalog(): + """Verify catalog received expected calls""" + table_count = len(catalog_state['tables']) + file_count = len(catalog_state['manifest_files']) + request_count = len(catalog_state['requests']) + + # Check if we received table creation requests + table_creation_requests = [r for r in catalog_state['requests'] if r['method'] == 'PUT' and '/tables/' in r['endpoint']] + + # Check if we received file registration requests + file_registration_requests = [r for r in catalog_state['requests'] if 'manifest/add-files' in r['endpoint']] + + verification = { + 'success': True, + 'tables_created': table_count, + 'files_registered': file_count, + 'total_requests': request_count, + 'table_creation_calls': len(table_creation_requests), + 'file_registration_calls': len(file_registration_requests), + 'issues': [] + } + + # Validate we got expected calls + if table_count == 0: + verification['success'] = False + verification['issues'].append('No tables were created') + + if file_count == 0: + verification['success'] = False + verification['issues'].append('No files were registered') + + # Log verification results + logger.info("="*60) + logger.info("CATALOG VERIFICATION RESULTS") + logger.info("="*60) + logger.info(f"Tables created: {table_count}") + logger.info(f"Files registered: {file_count}") + logger.info(f"Total requests: {request_count}") + logger.info(f"Table creation calls: {len(table_creation_requests)}") + logger.info(f"File registration calls: {len(file_registration_requests)}") + + if verification['success']: + logger.info("✓ Catalog verification PASSED") + else: + logger.info("✗ Catalog verification FAILED") + for issue in verification['issues']: + logger.info(f" - {issue}") + + logger.info("="*60) + + return jsonify(verification), 200 if verification['success'] else 400 + + +if __name__ == '__main__': + logger.info("Starting Mock REST Catalog...") + logger.info("Listening on port 5001") + app.run(host='0.0.0.0', port=5001, debug=False) diff --git a/tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt b/tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt new file mode 100644 index 00000000..5bd19d39 --- /dev/null +++ b/tests/destinations/quixlake-timeseries/mock-catalog/requirements.txt @@ -0,0 +1 @@ +flask==3.0.0 diff --git a/tests/destinations/quixlake-timeseries/produce_test_data.py b/tests/destinations/quixlake-timeseries/produce_test_data.py new file mode 100644 index 00000000..495fa9ba --- /dev/null +++ b/tests/destinations/quixlake-timeseries/produce_test_data.py @@ -0,0 +1,88 @@ +""" +Produce test data for Quix Lake Timeseries destination tests. + +Generates test messages with timestamps and partitionable fields (location, sensor_type) +to verify Hive partitioning and time-based partitioning functionality. +""" +import os +import time +import json +from datetime import datetime, timezone +from quixstreams import Application + + +def main(): + broker_address = os.getenv("Quix__Broker__Address", "kafka:9092") + topic_name = os.getenv("TEST_INPUT_TOPIC", "test-quixlake-input") + message_count = int(os.getenv("TEST_MESSAGE_COUNT", "10")) + + print(f"Producing {message_count} test messages to topic: {topic_name}") + + app = Application( + broker_address=broker_address, + producer_extra_config={ + "allow.auto.create.topics": "true" + } + ) + + topic = app.topic(topic_name) + + # Generate test data with multiple partitionable dimensions + locations = ["NYC", "LA", "CHI"] + sensor_types = ["temperature", "humidity", "pressure"] + + # Use a fixed timestamp for consistent testing (2024-01-15 12:00:00 UTC) + base_timestamp_ms = int(datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc).timestamp() * 1000) + + with app.get_producer() as producer: + for i in range(message_count): + # Distribute messages across different partitions + location = locations[i % len(locations)] + sensor_type = sensor_types[i % len(sensor_types)] + + # Add some time variation (each message is 1 hour apart) + timestamp_ms = base_timestamp_ms + (i * 3600 * 1000) + + message = { + "id": i, + "location": location, + "sensor_type": sensor_type, + "value": round(20.0 + (i * 1.5), 2), # Incrementing sensor value + "ts_ms": timestamp_ms, + "status": "active", + "metadata": { + "device_id": f"device_{i % 3}", + "firmware": "v1.2.3" + } + } + + # Convert timestamp to human-readable for logging + dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc) + print(f"Producing message {i}: location={location}, sensor_type={sensor_type}, " + f"ts={dt.isoformat()}, value={message['value']}") + + serialized = json.dumps(message).encode('utf-8') + + producer.produce( + topic=topic.name, + key=f"key_{location}_{sensor_type}_{i}", + value=serialized + ) + + producer.flush() + + print(f"\nSuccessfully produced {message_count} messages") + print(f"Messages span from {datetime.fromtimestamp(base_timestamp_ms / 1000, tz=timezone.utc).isoformat()}") + print(f" to {datetime.fromtimestamp((base_timestamp_ms + (message_count - 1) * 3600 * 1000) / 1000, tz=timezone.utc).isoformat()}") + print(f"Locations: {locations}") + print(f"Sensor types: {sensor_types}") + + +if __name__ == "__main__": + try: + main() + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + exit(1) diff --git a/tests/destinations/quixlake-timeseries/verify_catalog.py b/tests/destinations/quixlake-timeseries/verify_catalog.py new file mode 100644 index 00000000..1eabce9c --- /dev/null +++ b/tests/destinations/quixlake-timeseries/verify_catalog.py @@ -0,0 +1,234 @@ +""" +Verify catalog integration for Quix Lake Timeseries destination tests. + +Tests that the sink correctly interacts with the REST Catalog: +1. Tables are created/registered +2. Files are added to the manifest +3. Correct partition information is sent +4. Proper metadata is included +""" +import os +import sys +import requests +import time + + +def wait_for_catalog(catalog_url: str, max_attempts: int = 20) -> bool: + """ + Wait for the catalog service to be ready. + + Args: + catalog_url: Base URL of the catalog + max_attempts: Maximum number of retry attempts + + Returns: + True if catalog is ready, False otherwise + """ + print(f"Waiting for catalog at {catalog_url} to be ready...") + + for attempt in range(max_attempts): + try: + response = requests.get(f"{catalog_url}/health", timeout=5) + if response.status_code == 200: + print(f"✓ Catalog is ready") + return True + except requests.exceptions.RequestException as e: + if attempt == 0: + print(f" Catalog not ready yet: {e}") + + if attempt < max_attempts - 1: + time.sleep(2) + + return False + + +def get_catalog_state(catalog_url: str): + """Get the current state of the catalog""" + try: + response = requests.get(f"{catalog_url}/catalog/state", timeout=10) + if response.status_code == 200: + return response.json() + else: + print(f"✗ Failed to get catalog state: {response.status_code}") + print(f" Response: {response.text}") + return None + except Exception as e: + print(f"✗ Error getting catalog state: {e}") + return None + + +def verify_catalog(catalog_url: str): + """Verify the catalog received expected calls""" + try: + response = requests.get(f"{catalog_url}/catalog/verify", timeout=10) + result = response.json() + + if response.status_code == 200: + return result, True + else: + return result, False + except Exception as e: + print(f"✗ Error verifying catalog: {e}") + return None, False + + +def main(): + # Configuration from environment + catalog_url = os.getenv("CATALOG_URL", "http://mock-catalog:5001") + table_name = os.getenv("TABLE_NAME", "test-quixlake-input") + expected_message_count = int(os.getenv("TEST_MESSAGE_COUNT", "10")) + + print("="*60) + print("Catalog Integration Verification") + print("="*60) + print(f"Catalog URL: {catalog_url}") + print(f"Expected table: {table_name}") + print(f"Expected messages: {expected_message_count}") + print() + + # Wait for catalog to be ready + if not wait_for_catalog(catalog_url): + print(f"\n✗ FAILED: Catalog at {catalog_url} did not become ready") + sys.exit(1) + + # Get catalog state + print("\n" + "="*60) + print("Fetching Catalog State") + print("="*60) + + state = get_catalog_state(catalog_url) + + if not state: + print(f"✗ FAILED: Could not retrieve catalog state") + sys.exit(1) + + print(f"Total requests received: {state['total_requests']}") + print(f"Tables created: {len(state['tables'])}") + print(f"Files registered: {state['total_files']}") + + # Verify catalog interactions + print("\n" + "="*60) + print("Verifying Catalog Interactions") + print("="*60) + + verification, success = verify_catalog(catalog_url) + + if not verification: + print(f"✗ FAILED: Could not verify catalog") + sys.exit(1) + + # Print verification results + print(f"\nVerification Results:") + print(f" Tables created: {verification['tables_created']}") + print(f" Files registered: {verification['files_registered']}") + print(f" Table creation calls: {verification['table_creation_calls']}") + print(f" File registration calls: {verification['file_registration_calls']}") + + if not success or not verification['success']: + print(f"\n✗ VERIFICATION FAILED") + if verification.get('issues'): + print(f"\nIssues:") + for issue in verification['issues']: + print(f" - {issue}") + sys.exit(1) + + # Detailed validation + print("\n" + "="*60) + print("Detailed Validation") + print("="*60) + + # Check table was created + if verification['tables_created'] == 0: + print(f"✗ FAILED: No tables were created") + sys.exit(1) + + print(f"✓ Table creation verified ({verification['tables_created']} table(s))") + + # Check table metadata + if state['tables']: + for table_key, table_info in state['tables'].items(): + print(f"\nTable: {table_key}") + print(f" Location: {table_info['location']}") + print(f" Partition spec: {table_info['partition_spec']}") + print(f" Properties: {table_info['properties']}") + print(f" Created at: {table_info['created_at']}") + + # Verify expected partition columns are in partition spec + expected_partitions = os.getenv("HIVE_COLUMNS", "location,year,month,day").split(',') + actual_partitions = table_info.get('partition_spec', []) + + # Note: The partition spec might be empty initially (dynamic discovery) + # or might contain the expected partitions + if actual_partitions: + print(f" ✓ Partition spec configured: {actual_partitions}") + else: + print(f" ℹ Partition spec empty (dynamic discovery mode)") + + # Check files were registered + if verification['files_registered'] == 0: + print(f"\n✗ FAILED: No files were registered in manifest") + sys.exit(1) + + print(f"\n✓ File registration verified ({verification['files_registered']} file(s))") + + # Check file metadata + if state['total_files'] > 0: + print(f"\nSample registered files:") + for i, file_entry in enumerate(state.get('manifest_files', [])[:3]): + print(f"\n File {i+1}:") + print(f" Path: {file_entry['file_path']}") + print(f" Size: {file_entry['file_size']} bytes") + print(f" Rows: {file_entry['row_count']}") + print(f" Partitions: {file_entry['partition_values']}") + print(f" Registered: {file_entry['registered_at']}") + + # Verify partition values are present + if not file_entry['partition_values']: + print(f" ⚠ Warning: No partition values (expected for non-partitioned tables)") + else: + print(f" ✓ Partition values present") + + if state['total_files'] > 3: + print(f"\n ... and {state['total_files'] - 3} more file(s)") + + # Check minimum file registration count + # Should be at least 1 file per partition combination + # With location=3 values × year=1 × month=1 × day=1 = 3 partitions minimum + min_expected_files = 1 + if verification['files_registered'] < min_expected_files: + print(f"\n⚠ Warning: Expected at least {min_expected_files} file(s), got {verification['files_registered']}") + + # Display recent requests for debugging + if state.get('recent_requests'): + print(f"\n" + "="*60) + print("Recent Catalog Requests (last 10)") + print("="*60) + for i, req in enumerate(state['recent_requests'][-5:]): # Show last 5 + print(f"\n{i+1}. {req['method']} {req['endpoint']}") + print(f" Timestamp: {req['timestamp']}") + if req.get('data'): + print(f" Data keys: {list(req['data'].keys())}") + + # Final summary + print("\n" + "="*60) + print("✓ ALL CATALOG VERIFICATIONS PASSED") + print("="*60) + print(f"✓ Sink successfully registered table in catalog") + print(f"✓ Sink successfully registered {verification['files_registered']} file(s) in manifest") + print(f"✓ Catalog received {verification['total_requests']} total request(s)") + print() + + sys.exit(0) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n✗ Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/destinations/quixlake-timeseries/verify_output.py b/tests/destinations/quixlake-timeseries/verify_output.py new file mode 100644 index 00000000..43518a3a --- /dev/null +++ b/tests/destinations/quixlake-timeseries/verify_output.py @@ -0,0 +1,293 @@ +""" +Verify output for Quix Lake Timeseries destination tests. + +Validates that: +1. Parquet files are written to S3 with correct Hive partitioning +2. Data integrity is maintained +3. Partition structure matches configuration +4. All expected records are present +""" +import boto3 +import os +import sys +import time +import pyarrow.parquet as pq +from io import BytesIO + + +def parse_partition_from_path(path, table_name): + """ + Extract partition values from Hive-style S3 path. + + Example: prefix/table_name/year=2024/month=01/day=15/file.parquet + Returns: {'year': '2024', 'month': '01', 'day': '15'} + """ + partitions = {} + + # Find the table name in the path and process everything after it + if table_name in path: + parts = path.split(f"{table_name}/", 1) + if len(parts) > 1: + partition_path = parts[1] + # Split by '/' and look for key=value pairs + for part in partition_path.split('/'): + if '=' in part and not part.endswith('.parquet'): + key, value = part.split('=', 1) + partitions[key] = value + + return partitions + + +def main(): + # Configuration from environment + minio_endpoint = os.getenv("AWS_ENDPOINT_URL", "http://minio:9000") + access_key = os.getenv("AWS_ACCESS_KEY_ID", "minioadmin") + secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "minioadmin") + bucket_name = os.getenv("S3_BUCKET", "test-bucket") + s3_prefix = os.getenv("S3_PREFIX", "data") + table_name = os.getenv("TABLE_NAME", "test-quixlake-input") + hive_columns = os.getenv("HIVE_COLUMNS", "location,year,month,day") + expected_message_count = int(os.getenv("TEST_MESSAGE_COUNT", "10")) + + print(f"Configuration:") + print(f" Endpoint: {minio_endpoint}") + print(f" Bucket: {bucket_name}") + print(f" Prefix: {s3_prefix}") + print(f" Table: {table_name}") + print(f" Expected partitions: {hive_columns}") + print(f" Expected messages: {expected_message_count}") + + # Parse expected partition columns + expected_partition_columns = [col.strip() for col in hive_columns.split(',') if col.strip()] + + # Create S3 client + print(f"\nConnecting to S3 at {minio_endpoint}...") + s3_client = boto3.client( + 's3', + endpoint_url=minio_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name='us-east-1' + ) + + # Build the full table prefix + table_prefix = f"{s3_prefix}/{table_name}/" + + max_attempts = 30 + found_files = [] + + print(f"\nLooking for Parquet files in s3://{bucket_name}/{table_prefix}") + + # Retry logic with polling + for attempt in range(max_attempts): + found_files = [] + + try: + # List all objects under the table prefix + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket_name, Prefix=table_prefix) + + for page in pages: + if 'Contents' in page: + for obj in page['Contents']: + key = obj['Key'] + # Only include .parquet files (skip directory markers) + if key.endswith('.parquet'): + found_files.append(key) + print(f"Found file: {key} (size: {obj['Size']} bytes)") + + except Exception as e: + print(f"Error listing objects: {e}") + + if len(found_files) > 0: + print(f"\n✓ Found {len(found_files)} Parquet file(s)") + break + + print(f"Attempt {attempt + 1}/{max_attempts}: No files found yet, waiting...") + time.sleep(2) + + if len(found_files) == 0: + print(f"\n✗ FAILED: No Parquet files found after {max_attempts} attempts") + sys.exit(1) + + # Validate partition structure + print("\n--- Validating Partition Structure ---") + partition_paths = set() + + for file_key in found_files: + partitions = parse_partition_from_path(file_key, table_name) + + if expected_partition_columns: + # Verify all expected partition columns are present + actual_columns = set(partitions.keys()) + expected_columns = set(expected_partition_columns) + + if actual_columns != expected_columns: + print(f"✗ ERROR: Partition mismatch in {file_key}") + print(f" Expected columns: {expected_columns}") + print(f" Actual columns: {actual_columns}") + sys.exit(1) + + # Build partition path string for tracking unique partitions + partition_path = "/".join([f"{col}={partitions[col]}" for col in expected_partition_columns]) + partition_paths.add(partition_path) + print(f"✓ Valid partition: {partition_path}") + else: + # No partitioning expected + if partitions: + print(f"✗ ERROR: Found unexpected partitions in {file_key}: {partitions}") + sys.exit(1) + print(f"✓ No partitioning (as expected)") + + print(f"\n✓ All partition structures valid") + if partition_paths: + print(f"✓ Found {len(partition_paths)} unique partition(s):") + for path in sorted(partition_paths): + print(f" - {path}") + + # Read and validate Parquet data + print("\n--- Validating Parquet Data ---") + all_records = [] + + for file_key in found_files: + print(f"\nReading: {file_key}") + + try: + # Download Parquet file + obj_response = s3_client.get_object(Bucket=bucket_name, Key=file_key) + parquet_bytes = obj_response['Body'].read() + + # Read Parquet file with pyarrow + parquet_file = pq.read_table(BytesIO(parquet_bytes)) + + # Convert to list of dictionaries + records = parquet_file.to_pylist() + + print(f" Records in file: {len(records)}") + print(f" Columns: {parquet_file.schema.names}") + + all_records.extend(records) + + # Show first record as sample + if records: + print(f" Sample record: {records[0]}") + + except Exception as e: + print(f"✗ ERROR reading Parquet file {file_key}: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + print(f"\n✓ Successfully read {len(all_records)} total records from {len(found_files)} file(s)") + + # Validate record count + if len(all_records) < expected_message_count: + print(f"✗ ERROR: Expected at least {expected_message_count} records, found {len(all_records)}") + sys.exit(1) + + print(f"✓ Record count matches expectation ({len(all_records)} >= {expected_message_count})") + + # Validate record structure and content + print("\n--- Validating Record Content ---") + + # Partition columns are stored in the directory structure (Hive-style), not in the data + # So we exclude them from expected fields in the Parquet files + all_expected_fields = {'id', 'location', 'sensor_type', 'value', 'ts_ms', 'status', 'metadata', '__key'} + expected_fields = all_expected_fields - set(expected_partition_columns) + + print(f"Expected fields in Parquet data: {expected_fields}") + print(f"Partition columns (in path only): {expected_partition_columns}") + + found_ids = set() + + for i, record in enumerate(all_records[:expected_message_count]): # Check first N records + # Verify required fields exist + actual_fields = set(record.keys()) + missing_fields = expected_fields - actual_fields + + if missing_fields: + print(f"✗ ERROR: Record {i} missing fields: {missing_fields}") + print(f" Record: {record}") + sys.exit(1) + + # Verify field types + if not isinstance(record['id'], int): + print(f"✗ ERROR: Record {i} 'id' is not an integer: {type(record['id'])}") + sys.exit(1) + + if not isinstance(record['value'], (int, float)): + print(f"✗ ERROR: Record {i} 'value' is not numeric: {type(record['value'])}") + sys.exit(1) + + # ts_ms might be datetime after pyarrow conversion, check for both + if not isinstance(record['ts_ms'], (int, float)) and not hasattr(record['ts_ms'], 'timestamp'): + print(f"✗ ERROR: Record {i} 'ts_ms' is not a valid timestamp type: {type(record['ts_ms'])}") + sys.exit(1) + + # Track IDs + found_ids.add(record['id']) + + if i < 3: # Show first 3 records + print(f"✓ Record {i}: id={record['id']}, " + f"sensor_type={record['sensor_type']}, value={record['value']}") + + # Verify we have all expected IDs (0 through expected_message_count - 1) + expected_ids = set(range(expected_message_count)) + if found_ids != expected_ids: + print(f"✗ ERROR: Missing or extra IDs") + print(f" Expected: {sorted(expected_ids)}") + print(f" Found: {sorted(found_ids)}") + missing = expected_ids - found_ids + extra = found_ids - expected_ids + if missing: + print(f" Missing: {sorted(missing)}") + if extra: + print(f" Extra: {sorted(extra)}") + sys.exit(1) + + print(f"\n✓ All {len(all_records)} records validated successfully") + print(f"✓ All expected IDs present: {sorted(found_ids)[:5]}...{sorted(found_ids)[-2:]}") + + # Validate time-based partitioning if year/month/day columns are used + if 'year' in expected_partition_columns: + print("\n--- Validating Time-based Partitioning ---") + + # Check that partition values match the expected date (2024-01-15) + expected_year = "2024" + expected_month = "01" + expected_day = "15" + + for file_key in found_files: + partitions = parse_partition_from_path(file_key, table_name) + + if 'year' in partitions and partitions['year'] != expected_year: + print(f"✗ ERROR: Unexpected year in {file_key}: {partitions['year']} (expected {expected_year})") + sys.exit(1) + + if 'month' in partitions and partitions['month'] != expected_month: + print(f"✗ ERROR: Unexpected month in {file_key}: {partitions['month']} (expected {expected_month})") + sys.exit(1) + + if 'day' in partitions and partitions['day'] != expected_day: + print(f"✗ ERROR: Unexpected day in {file_key}: {partitions['day']} (expected {expected_day})") + sys.exit(1) + + print(f"✓ All time-based partitions match expected date: {expected_year}-{expected_month}-{expected_day}") + + print("\n" + "="*60) + print("✓ ALL VALIDATIONS PASSED") + print("="*60) + sys.exit(0) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n✗ Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/destinations/s3-file/docker-compose.test.yml b/tests/destinations/s3-file/docker-compose.test.yml index 3ca994d5..e2cfaf9e 100644 --- a/tests/destinations/s3-file/docker-compose.test.yml +++ b/tests/destinations/s3-file/docker-compose.test.yml @@ -1,4 +1,4 @@ -# timeout: 60 +# timeout: 90 services: minio: image: minio/minio:latest @@ -83,7 +83,7 @@ services: environment: - Quix__Broker__Address=kafka:9092 - TEST_INPUT_TOPIC=test-s3-input - - TEST_MESSAGE_COUNT=3 + - TEST_MESSAGE_COUNT=2 - MINIO_ENDPOINT=minio:9000 - MINIO_ACCESS_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin @@ -96,7 +96,7 @@ services: echo 'Producing test messages to Kafka...' && python /tests/produce_test_data.py && echo 'Waiting for s3-file-dest to process messages...' && - sleep 15 && + sleep 8 && echo 'Verifying data in S3...' && python /tests/verify_output.py " diff --git a/tests/destinations/s3-file/verify_output.py b/tests/destinations/s3-file/verify_output.py index 18cb49ef..cb734b5d 100644 --- a/tests/destinations/s3-file/verify_output.py +++ b/tests/destinations/s3-file/verify_output.py @@ -89,7 +89,7 @@ def main(): print(f"\nTotal records found: {len(all_records)}") # Verify we have the expected number of records - expected_message_count = 3 + expected_message_count = int(os.getenv("TEST_MESSAGE_COUNT", "3")) if len(all_records) < expected_message_count: print(f"ERROR: Expected {expected_message_count} records, found {len(all_records)}") sys.exit(1)