Skip to content

Commit 58905ba

Browse files
Adds S3 source connector (#659)
* Adds S3 source connector This commit introduces a new S3 source connector that monitors an S3 bucket for new files and streams their content to a Kafka topic. The connector supports: - Configuration via environment variables - File monitoring with configurable polling interval - Optional content download and chunking for large files - Metadata extraction for file information This replaces the previous S3 source implementation with a custom file watcher for more flexibility and control over the data ingestion process. * Updates S3 source configuration Updates the S3 source configuration to remove the app.yaml file and consolidate settings into the library.json. Configures the deployment type to 'Service' and increases the CPU and Memory allocation. Adds 'MAX_MB_PER_MESSAGE' and 'DOWNLOAD_CONTENT' environment variables. * Solve 3 comments Refactors S3 source configuration by renaming `S3_FOLDER_PATH` to `S3_FOLDER_PREFIX` and making it optional. Removes the required `S3_FILE_FORMAT` and `S3_FILE_COMPRESSION` parameters. Updates default value for `MAX_MB_PER_MESSAGE` and `DOWNLOAD_CONTENT`. Adds `POLL_INTERVAL_SECONDS` for configuring poll frequency. * Adds support for custom S3 endpoints Enables the S3 source to connect to S3-compatible storage like MinIO by introducing the AWS_ENDPOINT_URL option. This allows users to specify a custom endpoint for the S3 client, making the source more flexible and adaptable to various deployment scenarios. Also updates tests to reflect that the service now consumes json files. --------- Co-authored-by: Tomas Neubauer <tomas@quix.ai>
1 parent f52f1db commit 58905ba

File tree

8 files changed

+548
-126
lines changed

8 files changed

+548
-126
lines changed

python/sources/s3_source/README.md

Lines changed: 118 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,132 @@
1-
# S3 Source Connector
1+
# S3 Configuration File Source
22

3-
[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/s3_source) demonstrates how to connect to Amazon S3 to read files into a Kafka topic.
3+
A Quix Streams source service that monitors Amazon S3 buckets for configuration files and streams their content to Kafka topics for processing.
44

5-
## How to run
5+
## Overview
66

7-
Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector.
7+
This service continuously monitors a specified S3 bucket and folder path for new configuration files. When files are detected, it downloads and publishes their content to the `s3-data` Kafka topic for downstream processing by the XML-to-JSON transformer.
88

9-
Clicking `Set up connector` allows you to enter your connection details and runtime parameters.
9+
## Features
1010

11-
Then either:
12-
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.
11+
- **File Monitoring**: Continuous polling of S3 bucket for new files
12+
- **Compression Support**: Handles gzip-compressed files automatically
13+
- **Format Support**: Primarily designed for XML configuration files
14+
- **State Management**: Tracks processed files to avoid duplicates
15+
- **Error Handling**: Robust error handling for S3 connectivity issues
16+
- **Configurable Polling**: Adjustable polling interval for monitoring
1317

14-
* or click `Customise connector` to inspect or alter the code before deployment.
18+
## How it Works
1519

16-
## Environment variables
20+
1. **Monitoring**: Polls the specified S3 bucket/folder at regular intervals
21+
2. **File Detection**: Identifies new or modified files since last check
22+
3. **Download**: Retrieves file content from S3
23+
4. **Decompression**: Automatically handles gzip decompression if needed
24+
5. **Publishing**: Sends file content to Kafka topic for processing
25+
6. **State Tracking**: Records processed files to prevent reprocessing
1726

18-
This connector uses the following environment variables:
27+
## Environment Variables
1928

20-
- **output**: The output topic to stream Segment data into
21-
- **S3_BUCKET**: The URI or URL to your S3 bucket
22-
- **S3_REGION**: The region of your S3 bucket
23-
- **S3_SECRET**: Your AWS secret
24-
- **S3_ACCESS_KEY_ID**: Your AWS Access Key
25-
- **S3_FOLDER_PATH**: The path to the S3 folder to consume
26-
- **S3_FILE_FORMAT**: The file format of the files
27-
- **S3_FILE_COMPRESSION**: The type of file compression used for the files
29+
- **output**: Name of the output Kafka topic (default: `s3-data`)
30+
- **S3_BUCKET**: S3 bucket URI (e.g., `s3://quix-test-bucket/configurations/`)
31+
- **S3_REGION**: AWS region of the S3 bucket (e.g., `eu-west-2`)
32+
- **S3_SECRET**: AWS Secret Access Key (stored as secret)
33+
- **S3_ACCESS_KEY_ID**: AWS Access Key ID (stored as secret)
34+
- **S3_FOLDER_PATH**: Folder path within bucket to monitor (e.g., `configurations`)
35+
- **S3_FILE_FORMAT**: Expected file format (e.g., `xml`)
36+
- **S3_FILE_COMPRESSION**: Compression type (e.g., `gzip`)
37+
- **POLL_INTERVAL_SECONDS**: Polling interval in seconds (default: 30)
2838

29-
## Contribute
39+
## Configuration Example
3040

31-
Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
41+
Based on the current deployment configuration:
3242

33-
## Open source
43+
```yaml
44+
S3_BUCKET: "s3://quix-test-bucket/configurations/"
45+
S3_REGION: "eu-west-2"
46+
S3_FOLDER_PATH: "configurations"
47+
S3_FILE_FORMAT: "xml"
48+
S3_FILE_COMPRESSION: "gzip"
49+
POLL_INTERVAL_SECONDS: "30"
50+
```
3451
35-
This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.
52+
## Data Flow
3653
37-
Please star us and mention us on social to show your appreciation.
54+
The service fits into the larger configuration management pipeline:
55+
56+
```
57+
S3 Bucket → S3 Source → s3-data topic → XML-to-JSON → configurations topic → Configuration Sink → Configuration API
58+
```
59+
60+
## Development
61+
62+
### Local Development
63+
64+
```bash
65+
# Install dependencies
66+
pip install -r requirements.txt
67+
68+
# Set environment variables
69+
export S3_BUCKET="s3://your-bucket/path/"
70+
export S3_REGION="your-region"
71+
export S3_ACCESS_KEY_ID="your-access-key"
72+
export S3_SECRET="your-secret-key"
73+
export S3_FOLDER_PATH="configurations"
74+
export output="s3-data"
75+
76+
# Run the service
77+
python main.py
78+
```
79+
80+
### Docker Build
81+
82+
```bash
83+
docker build -t s3-source .
84+
```
85+
86+
## File Processing
87+
88+
### Supported File Types
89+
- **XML files**: Primary configuration format
90+
- **Gzip compression**: Automatic decompression support
91+
- **Text-based formats**: Any text-based configuration files
92+
93+
### File Structure
94+
The service expects files in the following S3 structure:
95+
```
96+
s3://bucket-name/configurations/
97+
├── config1.xml.gz
98+
├── config2.xml.gz
99+
└── subfolder/
100+
└── config3.xml.gz
101+
```
102+
103+
## Error Handling
104+
105+
- **S3 Connection Errors**: Automatic retry with exponential backoff
106+
- **Authentication Failures**: Clear error logging for credential issues
107+
- **File Access Errors**: Graceful handling of permission or availability issues
108+
- **Compression Errors**: Error handling for corrupted compressed files
109+
110+
## State Management
111+
112+
The service maintains persistent state to track:
113+
- Last processed timestamp
114+
- File checksums/ETags to detect changes
115+
- Processing status of individual files
116+
117+
State is preserved across service restarts through Quix's state management system.
118+
119+
## Monitoring
120+
121+
The service provides logging for:
122+
- File discovery and processing events
123+
- S3 API interactions and errors
124+
- Kafka message publishing status
125+
- Performance metrics and polling intervals
126+
127+
## Integration
128+
129+
This service integrates with:
130+
- **Upstream**: S3 bucket containing configuration files
131+
- **Downstream**: XML-to-JSON transformation service via `s3-data` topic
132+
- **Monitoring**: Quix platform logging and metrics systems

python/sources/s3_source/library.json

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,42 +55,42 @@
5555
"Required": true
5656
},
5757
{
58-
"Name": "S3_FOLDER_PATH",
58+
"Name": "S3_FOLDER_PREFIX",
5959
"Type": "EnvironmentVariable",
6060
"InputType": "FreeText",
61-
"Description": "The path to the S3 folder to consume",
61+
"Description": "The folder prefix within the S3 bucket to monitor (optional)",
6262
"DefaultValue": "",
63-
"Required": true
63+
"Required": false
6464
},
6565
{
66-
"Name": "S3_FILE_FORMAT",
66+
"Name": "POLL_INTERVAL_SECONDS",
6767
"Type": "EnvironmentVariable",
6868
"InputType": "FreeText",
69-
"Description": "The file format of the files",
70-
"DefaultValue": "json",
71-
"Required": true
69+
"Description": "How often to check S3 for new files (in seconds)",
70+
"DefaultValue": "30",
71+
"Required": false
7272
},
7373
{
74-
"Name": "S3_FILE_COMPRESSION",
74+
"Name": "MAX_MB_PER_MESSAGE",
7575
"Type": "EnvironmentVariable",
7676
"InputType": "FreeText",
77-
"Description": "The type of file compression used for the files",
78-
"DefaultValue": "gzip",
79-
"Required": true
77+
"Description": "Max size of the message in MB",
78+
"DefaultValue": "1",
79+
"Required": false
8080
},
8181
{
82-
"Name": "S3_ENDPOINT_URL",
82+
"Name": "DOWNLOAD_CONTENT",
8383
"Type": "EnvironmentVariable",
8484
"InputType": "FreeText",
85-
"Description": "Custom S3 endpoint URL for MinIO or other S3-compatible services (optional)",
86-
"DefaultValue": "",
85+
"Description": "Whether to publish the content of the file to Kafka or just the metadata",
86+
"DefaultValue": "True",
8787
"Required": false
8888
}
8989
],
9090
"DeploySettings": {
91-
"DeploymentType": "Job",
92-
"CpuMillicores": 100,
93-
"MemoryInMb": 100,
91+
"DeploymentType": "Service",
92+
"CpuMillicores": 250,
93+
"MemoryInMb": 250,
9494
"Replicas": 1,
9595
"PublicAccess": false,
9696
"ValidateConnection": false

python/sources/s3_source/main.py

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,50 @@
1-
from quixstreams import Application
2-
from quixstreams.sources.community.file.s3 import S3FileSource
3-
import time
4-
51
import os
2+
import logging
3+
from quixstreams import Application
4+
from s3_file_watcher import S3FileWatcher
65
from dotenv import load_dotenv
7-
load_dotenv()
8-
9-
app = Application()
10-
11-
# create an output topic
12-
output_topic = app.topic(os.environ['output'])
136

14-
# Custom setters for S3 file records (must be regular functions for pickling)
15-
def key_setter(row):
16-
"""Use record ID as the message key, or None if not present."""
17-
return str(row.get('id', '')) if 'id' in row else None
18-
19-
def value_setter(row):
20-
"""Use the whole record as the message value."""
21-
return row
22-
23-
def timestamp_setter(row):
24-
"""Use current time as message timestamp."""
25-
return int(time.time() * 1000)
26-
27-
# Build S3FileSource kwargs
28-
source_kwargs = {
29-
'filepath': os.environ['S3_FOLDER_PATH'],
30-
'bucket': os.environ['S3_BUCKET'],
31-
'aws_access_key_id': os.environ['S3_ACCESS_KEY_ID'],
32-
'aws_secret_access_key': os.environ['S3_SECRET'],
33-
'region_name': os.environ['S3_REGION'],
34-
'file_format': os.environ['S3_FILE_FORMAT'],
35-
'compression': os.environ.get('S3_FILE_COMPRESSION'),
36-
'key_setter': key_setter,
37-
'value_setter': value_setter,
38-
'timestamp_setter': timestamp_setter,
39-
}
40-
41-
# Support custom endpoint for MinIO or other S3-compatible services
42-
if 'S3_ENDPOINT_URL' in os.environ:
43-
source_kwargs['endpoint_url'] = os.environ['S3_ENDPOINT_URL']
44-
45-
# create the S3 file source
46-
source = S3FileSource(**source_kwargs)
7+
# Load environment variables
8+
load_dotenv()
479

48-
app.add_source(source, topic=output_topic)
10+
# Configuration
11+
AWS_ACCESS_KEY_ID = os.getenv("S3_ACCESS_KEY_ID")
12+
AWS_SECRET_ACCESS_KEY = os.getenv("S3_SECRET")
13+
S3_BUCKET_NAME = os.environ["S3_BUCKET"]
14+
S3_FOLDER_PREFIX = os.getenv("S3_FOLDER_PREFIX", "")
15+
AWS_REGION = os.getenv("S3_REGION", "us-east-1")
16+
AWS_ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL") # For MinIO or custom S3-compatible endpoints
17+
TOPIC_NAME = os.environ["output"]
18+
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SECONDS", "30"))
19+
20+
# Logging setup
21+
logging.basicConfig(level=logging.INFO)
22+
logger = logging.getLogger(__name__)
23+
24+
# Create Quix Application
25+
app = Application(consumer_group="s3_file_watcher_v1.2", auto_create_topics=True)
26+
27+
# Create S3 File Watcher Source
28+
s3_file_watcher = S3FileWatcher(
29+
name="s3_file_watcher",
30+
bucket_name=S3_BUCKET_NAME,
31+
folder_prefix=S3_FOLDER_PREFIX,
32+
aws_access_key_id=AWS_ACCESS_KEY_ID,
33+
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
34+
region_name=AWS_REGION,
35+
endpoint_url=AWS_ENDPOINT_URL,
36+
poll_interval=POLL_INTERVAL
37+
)
38+
39+
# Define the topic using the "output" environment variable
40+
topic = app.topic(TOPIC_NAME)
41+
42+
# Add source to application
43+
app.add_source(s3_file_watcher, topic)
4944

5045
if __name__ == "__main__":
51-
app.run()
46+
try:
47+
logging.basicConfig(level=logging.INFO)
48+
app.run()
49+
except KeyboardInterrupt:
50+
print("\nProgram interrupted by user. Exiting gracefully.")
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
quixstreams[s3]==3.23.1
1+
quixstreams==3.16.1
22
python-dotenv
3+
boto3

0 commit comments

Comments
 (0)