Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 118 additions & 23 deletions python/sources/s3_source/README.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,132 @@
# S3 Source Connector
# S3 Configuration File Source

[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/s3_source) demonstrates how to connect to Amazon S3 to read files into a Kafka topic.
A Quix Streams source service that monitors Amazon S3 buckets for configuration files and streams their content to Kafka topics for processing.

## How to run
## Overview

Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector.
This service continuously monitors a specified S3 bucket and folder path for new configuration files. When files are detected, it downloads and publishes their content to the `s3-data` Kafka topic for downstream processing by the XML-to-JSON transformer.

Clicking `Set up connector` allows you to enter your connection details and runtime parameters.
## Features

Then either:
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.
- **File Monitoring**: Continuous polling of S3 bucket for new files
- **Compression Support**: Handles gzip-compressed files automatically
- **Format Support**: Primarily designed for XML configuration files
- **State Management**: Tracks processed files to avoid duplicates
- **Error Handling**: Robust error handling for S3 connectivity issues
- **Configurable Polling**: Adjustable polling interval for monitoring

* or click `Customise connector` to inspect or alter the code before deployment.
## How it Works

## Environment variables
1. **Monitoring**: Polls the specified S3 bucket/folder at regular intervals
2. **File Detection**: Identifies new or modified files since last check
3. **Download**: Retrieves file content from S3
4. **Decompression**: Automatically handles gzip decompression if needed
5. **Publishing**: Sends file content to Kafka topic for processing
6. **State Tracking**: Records processed files to prevent reprocessing

This connector uses the following environment variables:
## Environment Variables

- **output**: The output topic to stream Segment data into
- **S3_BUCKET**: The URI or URL to your S3 bucket
- **S3_REGION**: The region of your S3 bucket
- **S3_SECRET**: Your AWS secret
- **S3_ACCESS_KEY_ID**: Your AWS Access Key
- **S3_FOLDER_PATH**: The path to the S3 folder to consume
- **S3_FILE_FORMAT**: The file format of the files
- **S3_FILE_COMPRESSION**: The type of file compression used for the files
- **output**: Name of the output Kafka topic (default: `s3-data`)
- **S3_BUCKET**: S3 bucket URI (e.g., `s3://quix-test-bucket/configurations/`)
- **S3_REGION**: AWS region of the S3 bucket (e.g., `eu-west-2`)
- **S3_SECRET**: AWS Secret Access Key (stored as secret)
- **S3_ACCESS_KEY_ID**: AWS Access Key ID (stored as secret)
- **S3_FOLDER_PATH**: Folder path within bucket to monitor (e.g., `configurations`)
- **S3_FILE_FORMAT**: Expected file format (e.g., `xml`)
- **S3_FILE_COMPRESSION**: Compression type (e.g., `gzip`)
- **POLL_INTERVAL_SECONDS**: Polling interval in seconds (default: 30)

## Contribute
## Configuration Example

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
Based on the current deployment configuration:

## Open source
```yaml
S3_BUCKET: "s3://quix-test-bucket/configurations/"
S3_REGION: "eu-west-2"
S3_FOLDER_PATH: "configurations"
S3_FILE_FORMAT: "xml"
S3_FILE_COMPRESSION: "gzip"
POLL_INTERVAL_SECONDS: "30"
```

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.
## Data Flow

Please star us and mention us on social to show your appreciation.
The service fits into the larger configuration management pipeline:

```
S3 Bucket → S3 Source → s3-data topic → XML-to-JSON → configurations topic → Configuration Sink → Configuration API
```

## Development

### Local Development

```bash
# Install dependencies
pip install -r requirements.txt

# Set environment variables
export S3_BUCKET="s3://your-bucket/path/"
export S3_REGION="your-region"
export S3_ACCESS_KEY_ID="your-access-key"
export S3_SECRET="your-secret-key"
export S3_FOLDER_PATH="configurations"
export output="s3-data"

# Run the service
python main.py
```

### Docker Build

```bash
docker build -t s3-source .
```

## File Processing

### Supported File Types
- **XML files**: Primary configuration format
- **Gzip compression**: Automatic decompression support
- **Text-based formats**: Any text-based configuration files

### File Structure
The service expects files in the following S3 structure:
```
s3://bucket-name/configurations/
├── config1.xml.gz
├── config2.xml.gz
└── subfolder/
└── config3.xml.gz
```

## Error Handling

- **S3 Connection Errors**: Automatic retry with exponential backoff
- **Authentication Failures**: Clear error logging for credential issues
- **File Access Errors**: Graceful handling of permission or availability issues
- **Compression Errors**: Error handling for corrupted compressed files

## State Management

The service maintains persistent state to track:
- Last processed timestamp
- File checksums/ETags to detect changes
- Processing status of individual files

State is preserved across service restarts through Quix's state management system.

## Monitoring

The service provides logging for:
- File discovery and processing events
- S3 API interactions and errors
- Kafka message publishing status
- Performance metrics and polling intervals

## Integration

This service integrates with:
- **Upstream**: S3 bucket containing configuration files
- **Downstream**: XML-to-JSON transformation service via `s3-data` topic
- **Monitoring**: Quix platform logging and metrics systems
34 changes: 17 additions & 17 deletions python/sources/s3_source/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,42 +55,42 @@
"Required": true
},
{
"Name": "S3_FOLDER_PATH",
"Name": "S3_FOLDER_PREFIX",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The path to the S3 folder to consume",
"Description": "The folder prefix within the S3 bucket to monitor (optional)",
"DefaultValue": "",
"Required": true
"Required": false
},
{
"Name": "S3_FILE_FORMAT",
"Name": "POLL_INTERVAL_SECONDS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The file format of the files",
"DefaultValue": "json",
"Required": true
"Description": "How often to check S3 for new files (in seconds)",
"DefaultValue": "30",
"Required": false
},
{
"Name": "S3_FILE_COMPRESSION",
"Name": "MAX_MB_PER_MESSAGE",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The type of file compression used for the files",
"DefaultValue": "gzip",
"Required": true
"Description": "Max size of the message in MB",
"DefaultValue": "1",
"Required": false
},
{
"Name": "S3_ENDPOINT_URL",
"Name": "DOWNLOAD_CONTENT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Custom S3 endpoint URL for MinIO or other S3-compatible services (optional)",
"DefaultValue": "",
"Description": "Whether to publish the content of the file to Kafka or just the metadata",
"DefaultValue": "True",
"Required": false
}
],
"DeploySettings": {
"DeploymentType": "Job",
"CpuMillicores": 100,
"MemoryInMb": 100,
"DeploymentType": "Service",
"CpuMillicores": 250,
"MemoryInMb": 250,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": false
Expand Down
89 changes: 44 additions & 45 deletions python/sources/s3_source/main.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,50 @@
from quixstreams import Application
from quixstreams.sources.community.file.s3 import S3FileSource
import time

import os
import logging
from quixstreams import Application
from s3_file_watcher import S3FileWatcher
from dotenv import load_dotenv
load_dotenv()

app = Application()

# create an output topic
output_topic = app.topic(os.environ['output'])

# Custom setters for S3 file records (must be regular functions for pickling)
def key_setter(row):
"""Use record ID as the message key, or None if not present."""
return str(row.get('id', '')) if 'id' in row else None

def value_setter(row):
"""Use the whole record as the message value."""
return row

def timestamp_setter(row):
"""Use current time as message timestamp."""
return int(time.time() * 1000)

# Build S3FileSource kwargs
source_kwargs = {
'filepath': os.environ['S3_FOLDER_PATH'],
'bucket': os.environ['S3_BUCKET'],
'aws_access_key_id': os.environ['S3_ACCESS_KEY_ID'],
'aws_secret_access_key': os.environ['S3_SECRET'],
'region_name': os.environ['S3_REGION'],
'file_format': os.environ['S3_FILE_FORMAT'],
'compression': os.environ.get('S3_FILE_COMPRESSION'),
'key_setter': key_setter,
'value_setter': value_setter,
'timestamp_setter': timestamp_setter,
}

# Support custom endpoint for MinIO or other S3-compatible services
if 'S3_ENDPOINT_URL' in os.environ:
source_kwargs['endpoint_url'] = os.environ['S3_ENDPOINT_URL']

# create the S3 file source
source = S3FileSource(**source_kwargs)
# Load environment variables
load_dotenv()

app.add_source(source, topic=output_topic)
# Configuration
AWS_ACCESS_KEY_ID = os.getenv("S3_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("S3_SECRET")
S3_BUCKET_NAME = os.environ["S3_BUCKET"]
S3_FOLDER_PREFIX = os.getenv("S3_FOLDER_PREFIX", "")
AWS_REGION = os.getenv("S3_REGION", "us-east-1")
AWS_ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL") # For MinIO or custom S3-compatible endpoints
TOPIC_NAME = os.environ["output"]
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SECONDS", "30"))

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create Quix Application
app = Application(consumer_group="s3_file_watcher_v1.2", auto_create_topics=True)

# Create S3 File Watcher Source
s3_file_watcher = S3FileWatcher(
name="s3_file_watcher",
bucket_name=S3_BUCKET_NAME,
folder_prefix=S3_FOLDER_PREFIX,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION,
endpoint_url=AWS_ENDPOINT_URL,
poll_interval=POLL_INTERVAL
)

# Define the topic using the "output" environment variable
topic = app.topic(TOPIC_NAME)

# Add source to application
app.add_source(s3_file_watcher, topic)

if __name__ == "__main__":
app.run()
try:
logging.basicConfig(level=logging.INFO)
app.run()
except KeyboardInterrupt:
print("\nProgram interrupted by user. Exiting gracefully.")
3 changes: 2 additions & 1 deletion python/sources/s3_source/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
quixstreams[s3]==3.23.1
quixstreams==3.16.1
python-dotenv
boto3
Loading