diff --git a/python/sources/s3_source/README.md b/python/sources/s3_source/README.md index 2f6c2dce..6528a0b8 100644 --- a/python/sources/s3_source/README.md +++ b/python/sources/s3_source/README.md @@ -1,37 +1,132 @@ -# S3 Source Connector +# S3 Configuration File Source -[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/s3_source) demonstrates how to connect to Amazon S3 to read files into a Kafka topic. +A Quix Streams source service that monitors Amazon S3 buckets for configuration files and streams their content to Kafka topics for processing. -## How to run +## Overview -Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector. +This service continuously monitors a specified S3 bucket and folder path for new configuration files. When files are detected, it downloads and publishes their content to the `s3-data` Kafka topic for downstream processing by the XML-to-JSON transformer. -Clicking `Set up connector` allows you to enter your connection details and runtime parameters. +## Features -Then either: -* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. +- **File Monitoring**: Continuous polling of S3 bucket for new files +- **Compression Support**: Handles gzip-compressed files automatically +- **Format Support**: Primarily designed for XML configuration files +- **State Management**: Tracks processed files to avoid duplicates +- **Error Handling**: Robust error handling for S3 connectivity issues +- **Configurable Polling**: Adjustable polling interval for monitoring -* or click `Customise connector` to inspect or alter the code before deployment. +## How it Works -## Environment variables +1. **Monitoring**: Polls the specified S3 bucket/folder at regular intervals +2. **File Detection**: Identifies new or modified files since last check +3. **Download**: Retrieves file content from S3 +4. **Decompression**: Automatically handles gzip decompression if needed +5. **Publishing**: Sends file content to Kafka topic for processing +6. **State Tracking**: Records processed files to prevent reprocessing -This connector uses the following environment variables: +## Environment Variables -- **output**: The output topic to stream Segment data into -- **S3_BUCKET**: The URI or URL to your S3 bucket -- **S3_REGION**: The region of your S3 bucket -- **S3_SECRET**: Your AWS secret -- **S3_ACCESS_KEY_ID**: Your AWS Access Key -- **S3_FOLDER_PATH**: The path to the S3 folder to consume -- **S3_FILE_FORMAT**: The file format of the files -- **S3_FILE_COMPRESSION**: The type of file compression used for the files +- **output**: Name of the output Kafka topic (default: `s3-data`) +- **S3_BUCKET**: S3 bucket URI (e.g., `s3://quix-test-bucket/configurations/`) +- **S3_REGION**: AWS region of the S3 bucket (e.g., `eu-west-2`) +- **S3_SECRET**: AWS Secret Access Key (stored as secret) +- **S3_ACCESS_KEY_ID**: AWS Access Key ID (stored as secret) +- **S3_FOLDER_PATH**: Folder path within bucket to monitor (e.g., `configurations`) +- **S3_FILE_FORMAT**: Expected file format (e.g., `xml`) +- **S3_FILE_COMPRESSION**: Compression type (e.g., `gzip`) +- **POLL_INTERVAL_SECONDS**: Polling interval in seconds (default: 30) -## Contribute +## Configuration Example -Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. +Based on the current deployment configuration: -## Open source +```yaml +S3_BUCKET: "s3://quix-test-bucket/configurations/" +S3_REGION: "eu-west-2" +S3_FOLDER_PATH: "configurations" +S3_FILE_FORMAT: "xml" +S3_FILE_COMPRESSION: "gzip" +POLL_INTERVAL_SECONDS: "30" +``` -This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. +## Data Flow -Please star us and mention us on social to show your appreciation. +The service fits into the larger configuration management pipeline: + +``` +S3 Bucket → S3 Source → s3-data topic → XML-to-JSON → configurations topic → Configuration Sink → Configuration API +``` + +## Development + +### Local Development + +```bash +# Install dependencies +pip install -r requirements.txt + +# Set environment variables +export S3_BUCKET="s3://your-bucket/path/" +export S3_REGION="your-region" +export S3_ACCESS_KEY_ID="your-access-key" +export S3_SECRET="your-secret-key" +export S3_FOLDER_PATH="configurations" +export output="s3-data" + +# Run the service +python main.py +``` + +### Docker Build + +```bash +docker build -t s3-source . +``` + +## File Processing + +### Supported File Types +- **XML files**: Primary configuration format +- **Gzip compression**: Automatic decompression support +- **Text-based formats**: Any text-based configuration files + +### File Structure +The service expects files in the following S3 structure: +``` +s3://bucket-name/configurations/ +├── config1.xml.gz +├── config2.xml.gz +└── subfolder/ + └── config3.xml.gz +``` + +## Error Handling + +- **S3 Connection Errors**: Automatic retry with exponential backoff +- **Authentication Failures**: Clear error logging for credential issues +- **File Access Errors**: Graceful handling of permission or availability issues +- **Compression Errors**: Error handling for corrupted compressed files + +## State Management + +The service maintains persistent state to track: +- Last processed timestamp +- File checksums/ETags to detect changes +- Processing status of individual files + +State is preserved across service restarts through Quix's state management system. + +## Monitoring + +The service provides logging for: +- File discovery and processing events +- S3 API interactions and errors +- Kafka message publishing status +- Performance metrics and polling intervals + +## Integration + +This service integrates with: +- **Upstream**: S3 bucket containing configuration files +- **Downstream**: XML-to-JSON transformation service via `s3-data` topic +- **Monitoring**: Quix platform logging and metrics systems \ No newline at end of file diff --git a/python/sources/s3_source/library.json b/python/sources/s3_source/library.json index bc03676b..447d13ce 100644 --- a/python/sources/s3_source/library.json +++ b/python/sources/s3_source/library.json @@ -55,42 +55,42 @@ "Required": true }, { - "Name": "S3_FOLDER_PATH", + "Name": "S3_FOLDER_PREFIX", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The path to the S3 folder to consume", + "Description": "The folder prefix within the S3 bucket to monitor (optional)", "DefaultValue": "", - "Required": true + "Required": false }, { - "Name": "S3_FILE_FORMAT", + "Name": "POLL_INTERVAL_SECONDS", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The file format of the files", - "DefaultValue": "json", - "Required": true + "Description": "How often to check S3 for new files (in seconds)", + "DefaultValue": "30", + "Required": false }, { - "Name": "S3_FILE_COMPRESSION", + "Name": "MAX_MB_PER_MESSAGE", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The type of file compression used for the files", - "DefaultValue": "gzip", - "Required": true + "Description": "Max size of the message in MB", + "DefaultValue": "1", + "Required": false }, { - "Name": "S3_ENDPOINT_URL", + "Name": "DOWNLOAD_CONTENT", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "Custom S3 endpoint URL for MinIO or other S3-compatible services (optional)", - "DefaultValue": "", + "Description": "Whether to publish the content of the file to Kafka or just the metadata", + "DefaultValue": "True", "Required": false } ], "DeploySettings": { - "DeploymentType": "Job", - "CpuMillicores": 100, - "MemoryInMb": 100, + "DeploymentType": "Service", + "CpuMillicores": 250, + "MemoryInMb": 250, "Replicas": 1, "PublicAccess": false, "ValidateConnection": false diff --git a/python/sources/s3_source/main.py b/python/sources/s3_source/main.py index 816a06b4..6f1a793e 100644 --- a/python/sources/s3_source/main.py +++ b/python/sources/s3_source/main.py @@ -1,51 +1,50 @@ -from quixstreams import Application -from quixstreams.sources.community.file.s3 import S3FileSource -import time - import os +import logging +from quixstreams import Application +from s3_file_watcher import S3FileWatcher from dotenv import load_dotenv -load_dotenv() - -app = Application() - -# create an output topic -output_topic = app.topic(os.environ['output']) -# Custom setters for S3 file records (must be regular functions for pickling) -def key_setter(row): - """Use record ID as the message key, or None if not present.""" - return str(row.get('id', '')) if 'id' in row else None - -def value_setter(row): - """Use the whole record as the message value.""" - return row - -def timestamp_setter(row): - """Use current time as message timestamp.""" - return int(time.time() * 1000) - -# Build S3FileSource kwargs -source_kwargs = { - 'filepath': os.environ['S3_FOLDER_PATH'], - 'bucket': os.environ['S3_BUCKET'], - 'aws_access_key_id': os.environ['S3_ACCESS_KEY_ID'], - 'aws_secret_access_key': os.environ['S3_SECRET'], - 'region_name': os.environ['S3_REGION'], - 'file_format': os.environ['S3_FILE_FORMAT'], - 'compression': os.environ.get('S3_FILE_COMPRESSION'), - 'key_setter': key_setter, - 'value_setter': value_setter, - 'timestamp_setter': timestamp_setter, -} - -# Support custom endpoint for MinIO or other S3-compatible services -if 'S3_ENDPOINT_URL' in os.environ: - source_kwargs['endpoint_url'] = os.environ['S3_ENDPOINT_URL'] - -# create the S3 file source -source = S3FileSource(**source_kwargs) +# Load environment variables +load_dotenv() -app.add_source(source, topic=output_topic) +# Configuration +AWS_ACCESS_KEY_ID = os.getenv("S3_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = os.getenv("S3_SECRET") +S3_BUCKET_NAME = os.environ["S3_BUCKET"] +S3_FOLDER_PREFIX = os.getenv("S3_FOLDER_PREFIX", "") +AWS_REGION = os.getenv("S3_REGION", "us-east-1") +AWS_ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL") # For MinIO or custom S3-compatible endpoints +TOPIC_NAME = os.environ["output"] +POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SECONDS", "30")) + +# Logging setup +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Create Quix Application +app = Application(consumer_group="s3_file_watcher_v1.2", auto_create_topics=True) + +# Create S3 File Watcher Source +s3_file_watcher = S3FileWatcher( + name="s3_file_watcher", + bucket_name=S3_BUCKET_NAME, + folder_prefix=S3_FOLDER_PREFIX, + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + region_name=AWS_REGION, + endpoint_url=AWS_ENDPOINT_URL, + poll_interval=POLL_INTERVAL +) + +# Define the topic using the "output" environment variable +topic = app.topic(TOPIC_NAME) + +# Add source to application +app.add_source(s3_file_watcher, topic) if __name__ == "__main__": - app.run() \ No newline at end of file + try: + logging.basicConfig(level=logging.INFO) + app.run() + except KeyboardInterrupt: + print("\nProgram interrupted by user. Exiting gracefully.") \ No newline at end of file diff --git a/python/sources/s3_source/requirements.txt b/python/sources/s3_source/requirements.txt index 5f3f3920..38e5a634 100644 --- a/python/sources/s3_source/requirements.txt +++ b/python/sources/s3_source/requirements.txt @@ -1,2 +1,3 @@ -quixstreams[s3]==3.23.1 +quixstreams==3.16.1 python-dotenv +boto3 \ No newline at end of file diff --git a/python/sources/s3_source/s3_file_watcher.py b/python/sources/s3_source/s3_file_watcher.py new file mode 100644 index 00000000..000aca8b --- /dev/null +++ b/python/sources/s3_source/s3_file_watcher.py @@ -0,0 +1,306 @@ +import logging +import time +import json +import os +import math +from datetime import datetime +import boto3 +from botocore.exceptions import ClientError, NoCredentialsError +from quixstreams.models.topics import Topic +from quixstreams.sources.base import StatefulSource + +logger = logging.getLogger(__name__) + + +class S3FileWatcher(StatefulSource): + def __init__( + self, + name: str, + bucket_name: str, + folder_prefix: str = "", + aws_access_key_id: str = None, + aws_secret_access_key: str = None, + region_name: str = 'us-east-1', + endpoint_url: str = None, + poll_interval: int = 30 + ) -> None: + # Clean bucket name - remove s3:// prefix if present + if bucket_name.startswith('s3://'): + parts = bucket_name[5:].split('/', 1) + bucket_name = parts[0] + if len(parts) > 1 and not folder_prefix: + folder_prefix = parts[1] + + bucket_name = bucket_name.rstrip('/') + if folder_prefix and not folder_prefix.endswith('/'): + folder_prefix += '/' + + self.bucket_name = bucket_name + self.folder_prefix = folder_prefix + self.poll_interval = poll_interval + self.aws_access_key_id = aws_access_key_id + self.aws_secret_access_key = aws_secret_access_key + self.region_name = region_name + self.endpoint_url = endpoint_url + + # Read DOWNLOAD_CONTENT from environment (optional) + download_content_env = os.getenv("DOWNLOAD_CONTENT", "True") + self.download_content = download_content_env.lower() in ("true", "1", "yes") + + # Read MAX_MB_PER_MESSAGE from environment (optional) + max_mb_env = os.getenv("MAX_MB_PER_MESSAGE") + + try: + if max_mb_env is not None: + self.max_mb_per_message = float(max_mb_env) + else: + self.max_mb_per_message = 1.0 # default MB limit + except ValueError: + logger.warning("Invalid MAX_MB_PER_MESSAGE value, falling back to 1 MB.") + self.max_mb_per_message = 1.0 + + # Enforce positive value + if self.max_mb_per_message <= 0: + logger.warning("MAX_MB_PER_MESSAGE <= 0, chunking disabled.") + self.max_mb_per_message = None + self.max_bytes_per_message = None + else: + # Convert MB → bytes (integer!) + self.max_bytes_per_message = int(self.max_mb_per_message * 1024 * 1024) + logger.info( + f"MAX_MB_PER_MESSAGE set: {self.max_mb_per_message} MB -> {self.max_bytes_per_message} bytes" + ) + + # Debug logging + logger.info(f"S3FileWatcher initialized:") + logger.info(f" Bucket: '{self.bucket_name}'") + logger.info(f" Folder prefix: '{self.folder_prefix}'") + logger.info(f" Region: '{self.region_name}'") + logger.info(f" Poll interval: {self.poll_interval}s") + logger.info(f" Download content: {self.download_content}") + if self.max_bytes_per_message: + logger.info(f" Max message size (bytes): {self.max_bytes_per_message}") + + super().__init__(name=name, shutdown_timeout=10) + + def setup(self): + client_kwargs = {'region_name': self.region_name} + + if self.aws_access_key_id and self.aws_secret_access_key: + client_kwargs['aws_access_key_id'] = self.aws_access_key_id + client_kwargs['aws_secret_access_key'] = self.aws_secret_access_key + + if self.endpoint_url: + client_kwargs['endpoint_url'] = self.endpoint_url + + self.s3_client = boto3.client('s3', **client_kwargs) + + self.last_check = datetime.min + + def _get_new_files(self): + """Get list of new files since last check""" + try: + list_params = {'Bucket': self.bucket_name} + if self.folder_prefix: + list_params['Prefix'] = self.folder_prefix + logger.debug(f"Listing objects with prefix: {self.folder_prefix}") + + response = self.s3_client.list_objects_v2(**list_params) + + new_files = [] + current_time = datetime.now() + + if 'Contents' in response: + logger.info(f"Found {len(response['Contents'])} total objects in bucket") + for obj in response['Contents']: + if obj['Key'].endswith('/'): + continue + + if (obj['Key'] not in self.processed_files and + obj['LastModified'].replace(tzinfo=None) > self.last_check): + new_files.append(obj) + logger.debug(f"New file found: {obj['Key']}") + else: + logger.debug(f"File {obj['Key']} already processed or not new.") + else: + logger.info("No objects found in bucket/folder") + + self.last_check = current_time + return new_files + + except ClientError as e: + logger.error(f"Error listing S3 objects: {e}") + return [] + except NoCredentialsError: + logger.error("AWS credentials not found") + return [] + except Exception as e: + logger.error(f"Unexpected error listing S3 objects: {e}") + return [] + + def _download_file_content(self, file_key): + """Download file content and metadata""" + try: + head_response = self.s3_client.head_object(Bucket=self.bucket_name, Key=file_key) + # Generate appropriate URL based on endpoint + if self.endpoint_url: + s3_url = f"{self.endpoint_url}/{self.bucket_name}/{file_key}" + else: + s3_url = f"https://{self.bucket_name}.s3.amazonaws.com/{file_key}" + + file_data = { + "file_name": file_key, + "content_type": head_response.get('ContentType', 'application/octet-stream'), + "size": head_response.get('ContentLength', 0), + "last_modified": head_response.get('LastModified', datetime.now()).isoformat(), + "url": s3_url, + "etag": head_response.get('ETag', '').strip('"') + } + + # Only download content if DOWNLOAD_CONTENT is True + if self.download_content: + response = self.s3_client.get_object(Bucket=self.bucket_name, Key=file_key) + file_data["content"] = response['Body'].read() + else: + file_data["content"] = None + + return file_data + + except ClientError as e: + logger.error(f"Error downloading file {file_key}: {e}") + return None + except Exception as e: + logger.error(f"Unexpected error downloading file {file_key}: {e}") + return None + + def _produce_message_chunked(self, file_key: str, file_data: dict) -> bool: + """Produce the file content either in a single message or in multiple chunks.""" + try: + content_bytes = file_data["content"] or b"" + original_size = len(content_bytes) + + base_message = { + "timestamp": datetime.now().isoformat(), + "file_name": file_data["file_name"], + "content_type": file_data["content_type"], + "size": file_data["size"], + "last_modified": file_data["last_modified"], + "url": file_data["url"], + "etag": file_data["etag"], + "bucket": self.bucket_name, + "original_size": original_size + } + + # If content is not downloaded, send metadata only + if not self.download_content: + base_message["content"] = None + base_message["part_index"] = 1 + base_message["total_parts"] = 1 + + self.produce( + key=file_key, + value=json.dumps(base_message).encode('utf-8') + ) + logger.info(f"Published metadata-only message for {file_key}") + return True + + # If no max size set, or content fits in one message -> send as single message + if not self.max_bytes_per_message or original_size <= self.max_bytes_per_message: + base_message["content"] = content_bytes.decode('utf-8', errors='ignore') if content_bytes else "" + base_message["part_index"] = 1 + base_message["total_parts"] = 1 + + self.produce( + key=file_key, + value=json.dumps(base_message).encode('utf-8') + ) + logger.info(f"Published single-part file {file_key} ({original_size} bytes)") + return True + + # Otherwise chunk it + total_parts = math.ceil(original_size / self.max_bytes_per_message) + logger.info( + f"File {file_key} is {original_size} bytes, splitting into {total_parts} parts " + f"of up to {self.max_bytes_per_message} bytes" + ) + + for part_index in range(total_parts): + start = part_index * self.max_bytes_per_message + end = min(start + self.max_bytes_per_message, original_size) + chunk = content_bytes[start:end] + + message = base_message.copy() + message["content"] = chunk.decode('utf-8', errors='ignore') if chunk else "" + message["part_index"] = part_index + 1 + message["total_parts"] = total_parts + message["chunk_size"] = len(chunk) + message["chunk_start"] = start + message["chunk_end"] = end + + self.produce( + key=f"{file_key}.part{part_index+1:04d}", + value=json.dumps(message).encode('utf-8') + ) + logger.debug(f"Published chunk {part_index+1}/{total_parts} for {file_key} ({len(chunk)} bytes)") + + logger.info(f"Published all {total_parts} parts for {file_key}") + return True + + except Exception as e: + logger.error(f"Error producing message(s) for {file_key}: {e}") + return False + + def run(self): + """Main run method for the source""" + logger.info("Starting S3 File Watcher...") + logger.info(f"Monitoring bucket: {self.bucket_name}") + logger.info(f"Poll interval: {self.poll_interval} seconds") + + self.processed_files = set(self.state.get("processed_files", [])) + logger.info(f"Loaded {len(self.processed_files)} previously processed files") + + try: + while self.running: + new_files = self._get_new_files() + + if new_files: + logger.info(f"Found {len(new_files)} new file(s)") + for file_obj in new_files: + if not self.running: + break + + file_key = file_obj['Key'] + logger.info(f"Processing file: {file_key}") + + file_data = self._download_file_content(file_key) + if file_data: + success = self._produce_message_chunked(file_key, file_data) + + if success: + self.processed_files.add(file_key) + self.state.set("processed_files", list(self.processed_files)) + self.flush() + logger.info(f"Published file {file_key} to topic and marked as processed") + else: + logger.error(f"Failed to publish file (or its chunks): {file_key}") + else: + logger.error(f"Failed to download file: {file_key}") + + time.sleep(self.poll_interval) + + except Exception as e: + logger.error(f"Error in main loop: {e}") + raise + finally: + self.flush() + logger.info("S3 File Watcher stopped") + + def default_topic(self) -> Topic: + """Default topic configuration""" + return Topic( + name=self.name, + key_serializer="string", + key_deserializer="string", + value_deserializer="json", + value_serializer="json", + ) \ No newline at end of file diff --git a/tests/sources/s3_source/docker-compose.test.yml b/tests/sources/s3_source/docker-compose.test.yml index 8ed78585..fc26bb93 100644 --- a/tests/sources/s3_source/docker-compose.test.yml +++ b/tests/sources/s3_source/docker-compose.test.yml @@ -1,4 +1,4 @@ -# timeout: 80 +# timeout: 60 services: minio: image: minio/minio:latest @@ -13,6 +13,7 @@ services: retries: 10 networks: - test-network + stop_grace_period: 3s minio-setup: image: python:3.11-slim depends_on: @@ -34,6 +35,7 @@ services: interval: 1s timeout: 5s retries: 10 + stop_grace_period: 3s kafka: image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 command: @@ -50,6 +52,7 @@ services: retries: 10 networks: - test-network + stop_grace_period: 3s s3-source: build: context: ../../../python/sources/s3_source @@ -63,10 +66,10 @@ services: - S3_REGION=us-east-1 - S3_SECRET=minioadmin - S3_ACCESS_KEY_ID=minioadmin - - S3_FOLDER_PATH=data/ - - S3_FILE_FORMAT=json - - S3_FILE_COMPRESSION=gzip - - S3_ENDPOINT_URL=http://minio:9000 + - S3_FOLDER_PREFIX=data/ + - POLL_INTERVAL_SECONDS=2 + - DOWNLOAD_CONTENT=True + - AWS_ENDPOINT_URL=http://minio:9000 networks: - test-network depends_on: @@ -74,6 +77,7 @@ services: condition: service_healthy minio-setup: condition: service_healthy + stop_grace_period: 3s test-verifier: build: context: ../../framework @@ -88,8 +92,8 @@ services: environment: - Quix__Broker__Address=kafka:9092 - TEST_OUTPUT_TOPIC=test-s3-output - - TEST_TIMEOUT=60 - - TEST_EXPECTED_COUNT=1 + - TEST_TIMEOUT=30 + - TEST_EXPECTED_COUNT=3 volumes: - ./verify_output.py:/tests/verify_output.py:ro working_dir: / @@ -100,6 +104,7 @@ services: condition: service_healthy s3-source: condition: service_started + stop_grace_period: 3s networks: test-network: driver: bridge \ No newline at end of file diff --git a/tests/sources/s3_source/setup_minio.py b/tests/sources/s3_source/setup_minio.py index b5812690..cf9fb671 100644 --- a/tests/sources/s3_source/setup_minio.py +++ b/tests/sources/s3_source/setup_minio.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import boto3 -import gzip import json from botocore.client import Config @@ -22,24 +21,24 @@ print(f"Bucket might already exist: {e}") # Upload test data -# Note: Each file contains one JSON object per line (JSONL format) +# Note: Upload plain text files for testing test_data = [ - [{"id": 1, "name": "test1", "value": 100}], - [{"id": 2, "name": "test2", "value": 200}], - [{"id": 3, "name": "test3", "value": 300}], + {"id": 1, "name": "test1", "value": 100}, + {"id": 2, "name": "test2", "value": 200}, + {"id": 3, "name": "test3", "value": 300}, ] -for i, records in enumerate(test_data, 1): - # Create JSONL content (one JSON per line) - jsonl_content = '\n'.join(json.dumps(record) for record in records).encode('utf-8') - gzipped_content = gzip.compress(jsonl_content) +for i, record in enumerate(test_data, 1): + # Create JSON content + json_content = json.dumps(record).encode('utf-8') # Upload to S3 s3_client.put_object( Bucket='test-bucket', - Key=f'data/data{i}.json.gz', - Body=gzipped_content + Key=f'data/data{i}.json', + Body=json_content, + ContentType='application/json' ) - print(f"Uploaded data{i}.json.gz") + print(f"Uploaded data{i}.json") print("MinIO setup complete") diff --git a/tests/sources/s3_source/verify_output.py b/tests/sources/s3_source/verify_output.py index b3c1ebb6..82afd3ab 100644 --- a/tests/sources/s3_source/verify_output.py +++ b/tests/sources/s3_source/verify_output.py @@ -1,5 +1,6 @@ import os import time +import json from quixstreams import Application from quixstreams.sinks.core.list import ListSink @@ -8,7 +9,7 @@ def main(): broker_address = os.getenv("Quix__Broker__Address", "kafka:9092") output_topic = os.getenv("TEST_OUTPUT_TOPIC", "test-s3-output") timeout = int(os.getenv("TEST_TIMEOUT", "60")) - min_expected_messages = 1 + min_expected_messages = 3 # We expect 3 files from setup_minio.py print(f"Consuming from output topic: {output_topic}") print(f"Waiting for at least {min_expected_messages} messages with timeout of {timeout}s") @@ -34,42 +35,58 @@ def main(): print(f"FAILED: Expected at least {min_expected_messages} messages, got {message_count}") exit(1) - # Verify first message structure + # Verify first message structure - now expects S3 metadata format first_message = list_sink[0] - expected_fields = {"id", "name", "value"} + expected_metadata_fields = {"timestamp", "file_name", "content_type", "size", + "last_modified", "url", "etag", "bucket", + "original_size", "content", "part_index", "total_parts"} actual_fields = set(first_message.keys()) - print(f"First message: {first_message}") - print(f"Expected fields: {expected_fields}") - print(f"Actual fields: {actual_fields}") + print(f"First message keys: {actual_fields}") + print(f"Expected metadata fields: {expected_metadata_fields}") - if not expected_fields.issubset(actual_fields): - print(f"FAILED: Missing fields. Expected: {expected_fields}, Got: {actual_fields}") + if not expected_metadata_fields.issubset(actual_fields): + missing = expected_metadata_fields - actual_fields + print(f"FAILED: Missing fields: {missing}") + print(f"Full message: {first_message}") exit(1) - # Verify field types - if not isinstance(first_message["id"], int): - print(f"FAILED: 'id' should be integer, got {type(first_message['id'])}") + # Verify metadata field types + if not isinstance(first_message["file_name"], str): + print(f"FAILED: 'file_name' should be string, got {type(first_message['file_name'])}") exit(1) - if not isinstance(first_message["name"], str): - print(f"FAILED: 'name' should be string, got {type(first_message['name'])}") + if not isinstance(first_message["size"], int): + print(f"FAILED: 'size' should be integer, got {type(first_message['size'])}") exit(1) - if not isinstance(first_message["value"], int): - print(f"FAILED: 'value' should be integer, got {type(first_message['value'])}") + if not isinstance(first_message["bucket"], str): + print(f"FAILED: 'bucket' should be string, got {type(first_message['bucket'])}") exit(1) - print("Field types validated successfully") + if first_message["bucket"] != "test-bucket": + print(f"FAILED: Expected bucket 'test-bucket', got {first_message['bucket']}") + exit(1) - unique_ids = set(msg["id"] for msg in list_sink) - print(f"Unique IDs found: {unique_ids}") + print("Metadata field types validated successfully") - if len(unique_ids) < 1: - print(f"FAILED: Should have at least 1 unique ID, got {len(unique_ids)}") + # Verify that we have content (since DOWNLOAD_CONTENT=True) + if first_message["content"] is None: + print(f"FAILED: Expected content to be downloaded, but got None") exit(1) - print(f"SUCCESS: Verified {message_count} messages with correct structure and {len(unique_ids)} unique records") + # Verify file names + file_names = set(msg["file_name"] for msg in list_sink) + print(f"File names found: {file_names}") + + # We expect files in the data/ folder + expected_prefix = "data/" + for file_name in file_names: + if not file_name.startswith(expected_prefix): + print(f"FAILED: Expected file names to start with '{expected_prefix}', got {file_name}") + exit(1) + + print(f"SUCCESS: Verified {message_count} messages with correct S3 metadata structure and {len(file_names)} unique files") exit(0)