-
Notifications
You must be signed in to change notification settings - Fork 32
Adds S3 source connector #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,37 +1,132 @@ | ||
| # S3 Source Connector | ||
| # S3 Configuration File Source | ||
|
|
||
| [This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/s3_source) demonstrates how to connect to Amazon S3 to read files into a Kafka topic. | ||
| A Quix Streams source service that monitors Amazon S3 buckets for configuration files and streams their content to Kafka topics for processing. | ||
|
|
||
| ## How to run | ||
| ## Overview | ||
|
|
||
| Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector. | ||
| This service continuously monitors a specified S3 bucket and folder path for new configuration files. When files are detected, it downloads and publishes their content to the `s3-data` Kafka topic for downstream processing by the XML-to-JSON transformer. | ||
|
|
||
| Clicking `Set up connector` allows you to enter your connection details and runtime parameters. | ||
| ## Features | ||
|
|
||
| Then either: | ||
| * click `Test connection & deploy` to deploy the pre-built and configured container into Quix. | ||
| - **File Monitoring**: Continuous polling of S3 bucket for new files | ||
| - **Compression Support**: Handles gzip-compressed files automatically | ||
| - **Format Support**: Primarily designed for XML configuration files | ||
| - **State Management**: Tracks processed files to avoid duplicates | ||
| - **Error Handling**: Robust error handling for S3 connectivity issues | ||
| - **Configurable Polling**: Adjustable polling interval for monitoring | ||
|
|
||
| * or click `Customise connector` to inspect or alter the code before deployment. | ||
| ## How it Works | ||
|
|
||
| ## Environment variables | ||
| 1. **Monitoring**: Polls the specified S3 bucket/folder at regular intervals | ||
| 2. **File Detection**: Identifies new or modified files since last check | ||
| 3. **Download**: Retrieves file content from S3 | ||
| 4. **Decompression**: Automatically handles gzip decompression if needed | ||
| 5. **Publishing**: Sends file content to Kafka topic for processing | ||
| 6. **State Tracking**: Records processed files to prevent reprocessing | ||
|
|
||
| This connector uses the following environment variables: | ||
| ## Environment Variables | ||
|
|
||
| - **output**: The output topic to stream Segment data into | ||
| - **S3_BUCKET**: The URI or URL to your S3 bucket | ||
| - **S3_REGION**: The region of your S3 bucket | ||
| - **S3_SECRET**: Your AWS secret | ||
| - **S3_ACCESS_KEY_ID**: Your AWS Access Key | ||
| - **S3_FOLDER_PATH**: The path to the S3 folder to consume | ||
| - **S3_FILE_FORMAT**: The file format of the files | ||
| - **S3_FILE_COMPRESSION**: The type of file compression used for the files | ||
| - **output**: Name of the output Kafka topic (default: `s3-data`) | ||
| - **S3_BUCKET**: S3 bucket URI (e.g., `s3://quix-test-bucket/configurations/`) | ||
| - **S3_REGION**: AWS region of the S3 bucket (e.g., `eu-west-2`) | ||
| - **S3_SECRET**: AWS Secret Access Key (stored as secret) | ||
| - **S3_ACCESS_KEY_ID**: AWS Access Key ID (stored as secret) | ||
| - **S3_FOLDER_PATH**: Folder path within bucket to monitor (e.g., `configurations`) | ||
| - **S3_FILE_FORMAT**: Expected file format (e.g., `xml`) | ||
| - **S3_FILE_COMPRESSION**: Compression type (e.g., `gzip`) | ||
| - **POLL_INTERVAL_SECONDS**: Polling interval in seconds (default: 30) | ||
|
|
||
| ## Contribute | ||
| ## Configuration Example | ||
|
|
||
| Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. | ||
| Based on the current deployment configuration: | ||
|
|
||
| ## Open source | ||
| ```yaml | ||
| S3_BUCKET: "s3://quix-test-bucket/configurations/" | ||
| S3_REGION: "eu-west-2" | ||
| S3_FOLDER_PATH: "configurations" | ||
| S3_FILE_FORMAT: "xml" | ||
| S3_FILE_COMPRESSION: "gzip" | ||
| POLL_INTERVAL_SECONDS: "30" | ||
| ``` | ||
|
|
||
| This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. | ||
| ## Data Flow | ||
|
|
||
| Please star us and mention us on social to show your appreciation. | ||
| The service fits into the larger configuration management pipeline: | ||
|
|
||
| ``` | ||
| S3 Bucket → S3 Source → s3-data topic → XML-to-JSON → configurations topic → Configuration Sink → Configuration API | ||
| ``` | ||
|
|
||
| ## Development | ||
|
|
||
| ### Local Development | ||
|
|
||
| ```bash | ||
| # Install dependencies | ||
| pip install -r requirements.txt | ||
|
|
||
| # Set environment variables | ||
| export S3_BUCKET="s3://your-bucket/path/" | ||
| export S3_REGION="your-region" | ||
| export S3_ACCESS_KEY_ID="your-access-key" | ||
| export S3_SECRET="your-secret-key" | ||
| export S3_FOLDER_PATH="configurations" | ||
| export output="s3-data" | ||
|
|
||
| # Run the service | ||
| python main.py | ||
| ``` | ||
|
|
||
| ### Docker Build | ||
|
|
||
| ```bash | ||
| docker build -t s3-source . | ||
| ``` | ||
|
|
||
| ## File Processing | ||
|
|
||
| ### Supported File Types | ||
| - **XML files**: Primary configuration format | ||
| - **Gzip compression**: Automatic decompression support | ||
| - **Text-based formats**: Any text-based configuration files | ||
|
|
||
| ### File Structure | ||
| The service expects files in the following S3 structure: | ||
| ``` | ||
| s3://bucket-name/configurations/ | ||
| ├── config1.xml.gz | ||
| ├── config2.xml.gz | ||
| └── subfolder/ | ||
| └── config3.xml.gz | ||
| ``` | ||
|
|
||
| ## Error Handling | ||
|
|
||
| - **S3 Connection Errors**: Automatic retry with exponential backoff | ||
| - **Authentication Failures**: Clear error logging for credential issues | ||
| - **File Access Errors**: Graceful handling of permission or availability issues | ||
| - **Compression Errors**: Error handling for corrupted compressed files | ||
|
|
||
| ## State Management | ||
|
|
||
| The service maintains persistent state to track: | ||
| - Last processed timestamp | ||
| - File checksums/ETags to detect changes | ||
| - Processing status of individual files | ||
|
|
||
| State is preserved across service restarts through Quix's state management system. | ||
|
|
||
| ## Monitoring | ||
|
|
||
| The service provides logging for: | ||
| - File discovery and processing events | ||
| - S3 API interactions and errors | ||
| - Kafka message publishing status | ||
| - Performance metrics and polling intervals | ||
|
|
||
| ## Integration | ||
|
|
||
| This service integrates with: | ||
| - **Upstream**: S3 bucket containing configuration files | ||
| - **Downstream**: XML-to-JSON transformation service via `s3-data` topic | ||
| - **Monitoring**: Quix platform logging and metrics systems |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,51 +1,50 @@ | ||
| from quixstreams import Application | ||
| from quixstreams.sources.community.file.s3 import S3FileSource | ||
| import time | ||
|
|
||
| import os | ||
| import logging | ||
| from quixstreams import Application | ||
| from s3_file_watcher import S3FileWatcher | ||
| from dotenv import load_dotenv | ||
| load_dotenv() | ||
|
|
||
| app = Application() | ||
|
|
||
| # create an output topic | ||
| output_topic = app.topic(os.environ['output']) | ||
|
|
||
| # Custom setters for S3 file records (must be regular functions for pickling) | ||
| def key_setter(row): | ||
| """Use record ID as the message key, or None if not present.""" | ||
| return str(row.get('id', '')) if 'id' in row else None | ||
|
|
||
| def value_setter(row): | ||
| """Use the whole record as the message value.""" | ||
| return row | ||
|
|
||
| def timestamp_setter(row): | ||
| """Use current time as message timestamp.""" | ||
| return int(time.time() * 1000) | ||
|
|
||
| # Build S3FileSource kwargs | ||
| source_kwargs = { | ||
| 'filepath': os.environ['S3_FOLDER_PATH'], | ||
| 'bucket': os.environ['S3_BUCKET'], | ||
| 'aws_access_key_id': os.environ['S3_ACCESS_KEY_ID'], | ||
| 'aws_secret_access_key': os.environ['S3_SECRET'], | ||
| 'region_name': os.environ['S3_REGION'], | ||
| 'file_format': os.environ['S3_FILE_FORMAT'], | ||
| 'compression': os.environ.get('S3_FILE_COMPRESSION'), | ||
| 'key_setter': key_setter, | ||
| 'value_setter': value_setter, | ||
| 'timestamp_setter': timestamp_setter, | ||
| } | ||
|
|
||
| # Support custom endpoint for MinIO or other S3-compatible services | ||
| if 'S3_ENDPOINT_URL' in os.environ: | ||
| source_kwargs['endpoint_url'] = os.environ['S3_ENDPOINT_URL'] | ||
|
|
||
| # create the S3 file source | ||
| source = S3FileSource(**source_kwargs) | ||
| # Load environment variables | ||
| load_dotenv() | ||
|
|
||
| app.add_source(source, topic=output_topic) | ||
| # Configuration | ||
| AWS_ACCESS_KEY_ID = os.getenv("S3_ACCESS_KEY_ID") | ||
| AWS_SECRET_ACCESS_KEY = os.getenv("S3_SECRET") | ||
| S3_BUCKET_NAME = os.environ["S3_BUCKET"] | ||
| S3_FOLDER_PREFIX = os.getenv("S3_FOLDER_PREFIX", "") | ||
| AWS_REGION = os.getenv("S3_REGION", "us-east-1") | ||
| AWS_ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL") # For MinIO or custom S3-compatible endpoints | ||
| TOPIC_NAME = os.environ["output"] | ||
| POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SECONDS", "30")) | ||
|
|
||
| # Logging setup | ||
| logging.basicConfig(level=logging.INFO) | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Create Quix Application | ||
| app = Application(consumer_group="s3_file_watcher_v1.2", auto_create_topics=True) | ||
|
|
||
| # Create S3 File Watcher Source | ||
| s3_file_watcher = S3FileWatcher( | ||
| name="s3_file_watcher", | ||
| bucket_name=S3_BUCKET_NAME, | ||
| folder_prefix=S3_FOLDER_PREFIX, | ||
| aws_access_key_id=AWS_ACCESS_KEY_ID, | ||
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | ||
| region_name=AWS_REGION, | ||
| endpoint_url=AWS_ENDPOINT_URL, | ||
| poll_interval=POLL_INTERVAL | ||
| ) | ||
|
|
||
| # Define the topic using the "output" environment variable | ||
| topic = app.topic(TOPIC_NAME) | ||
|
|
||
| # Add source to application | ||
| app.add_source(s3_file_watcher, topic) | ||
|
|
||
| if __name__ == "__main__": | ||
| app.run() | ||
| try: | ||
| logging.basicConfig(level=logging.INFO) | ||
| app.run() | ||
| except KeyboardInterrupt: | ||
| print("\nProgram interrupted by user. Exiting gracefully.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| quixstreams[s3]==3.23.1 | ||
| quixstreams==3.16.1 | ||
| python-dotenv | ||
| boto3 |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.