Skip to content

Commit 9948aa5

Browse files
committed
Adds S3-compatible storage support
Enables the connector to support S3-compatible storage services like MinIO, Wasabi, DigitalOcean Spaces, and Backblaze B2, by allowing users to set a custom endpoint URL. Also updates the Quix platform link.
1 parent 6d7acd6 commit 9948aa5

File tree

3 files changed

+71
-9
lines changed

3 files changed

+71
-9
lines changed

python/destinations/quixlake-timeseries/README.md

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ This connector consumes time-series data from a Kafka topic and writes it to S3
1313

1414
## How to run
1515

16-
Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector.
16+
Create a [Quix](https://portal.cloud.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector.
1717

1818
Clicking `Set up connector` allows you to enter your connection details and runtime parameters.
1919

@@ -44,8 +44,13 @@ Then either:
4444
- **`AWS_REGION`**: AWS region for S3 bucket
4545
*Default*: `us-east-1`
4646

47-
- **`AWS_ENDPOINT_URL`**: S3 endpoint URL (for non-AWS S3-compatible storage like MinIO)
48-
*Default*: None
47+
- **`AWS_ENDPOINT_URL`**: Custom S3 endpoint URL for non-AWS S3-compatible storage
48+
*Examples*:
49+
- MinIO: `http://minio.example.com:9000`
50+
- Wasabi: `https://s3.wasabisys.com`
51+
- DigitalOcean Spaces: `https://nyc3.digitaloceanspaces.com`
52+
- Backblaze B2: `https://s3.us-west-004.backblazeb2.com`
53+
*Default*: None (uses AWS S3)
4954

5055
### Data Organization
5156

@@ -126,6 +131,46 @@ HIVE_COLUMNS=
126131
```
127132
Creates: `s3://bucket/prefix/table/data_*.parquet`
128133

134+
## Using Non-AWS S3-Compatible Storage
135+
136+
This connector supports any S3-compatible storage service by setting the `AWS_ENDPOINT_URL` environment variable.
137+
138+
### MinIO Example
139+
```bash
140+
AWS_ENDPOINT_URL=http://minio.example.com:9000
141+
AWS_ACCESS_KEY_ID=minioadmin
142+
AWS_SECRET_ACCESS_KEY=minioadmin
143+
AWS_REGION=us-east-1
144+
S3_BUCKET=my-data-lake
145+
```
146+
147+
### Wasabi Example
148+
```bash
149+
AWS_ENDPOINT_URL=https://s3.wasabisys.com
150+
AWS_ACCESS_KEY_ID=your-wasabi-access-key
151+
AWS_SECRET_ACCESS_KEY=your-wasabi-secret-key
152+
AWS_REGION=us-east-1
153+
S3_BUCKET=my-data-lake
154+
```
155+
156+
### DigitalOcean Spaces Example
157+
```bash
158+
AWS_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com
159+
AWS_ACCESS_KEY_ID=your-spaces-access-key
160+
AWS_SECRET_ACCESS_KEY=your-spaces-secret-key
161+
AWS_REGION=nyc3
162+
S3_BUCKET=my-data-lake
163+
```
164+
165+
### Backblaze B2 Example
166+
```bash
167+
AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com
168+
AWS_ACCESS_KEY_ID=your-b2-key-id
169+
AWS_SECRET_ACCESS_KEY=your-b2-application-key
170+
AWS_REGION=us-west-004
171+
S3_BUCKET=my-data-lake
172+
```
173+
129174
## Architecture
130175

131176
The sink uses a batching architecture for high throughput:
@@ -138,7 +183,9 @@ The sink uses a batching architecture for high throughput:
138183

139184
## Requirements
140185

141-
- S3 bucket access (AWS S3 or S3-compatible storage)
186+
- S3 bucket access:
187+
- AWS S3, or
188+
- Any S3-compatible storage (MinIO, Wasabi, DigitalOcean Spaces, Backblaze B2, etc.)
142189
- Optional: Quix REST Catalog endpoint for data catalog integration
143190

144191
## Contribute

python/destinations/quixlake-timeseries/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ def parse_hive_columns(columns_str: str) -> list:
5050
s3_bucket=os.environ["S3_BUCKET"],
5151
s3_prefix=os.getenv("S3_PREFIX", "data"),
5252
table_name=table_name,
53+
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
54+
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
55+
aws_region=os.getenv("AWS_REGION", "us-east-1"),
56+
s3_endpoint_url=os.getenv("AWS_ENDPOINT_URL"),
5357
hive_columns=hive_columns,
5458
timestamp_column=os.getenv("TIMESTAMP_COLUMN", "ts_ms"),
5559
catalog_url=os.getenv("CATALOG_URL"),

python/destinations/quixlake-timeseries/quixlake_sink.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import time
99
import logging
1010
import uuid
11-
import os
1211
from typing import List, Dict, Any, Optional
1312
from datetime import datetime, timezone
1413
from io import BytesIO
@@ -36,6 +35,10 @@ def __init__(
3635
s3_bucket: str,
3736
s3_prefix: str,
3837
table_name: str,
38+
aws_access_key_id: Optional[str] = None,
39+
aws_secret_access_key: Optional[str] = None,
40+
aws_region: str = "us-east-1",
41+
s3_endpoint_url: Optional[str] = None,
3942
hive_columns: List[str] = None,
4043
timestamp_column: str = "ts_ms",
4144
catalog_url: Optional[str] = None,
@@ -52,6 +55,11 @@ def __init__(
5255
s3_bucket: S3 bucket name
5356
s3_prefix: S3 prefix/path for data files
5457
table_name: Table name for registration
58+
aws_access_key_id: AWS access key ID
59+
aws_secret_access_key: AWS secret access key
60+
aws_region: AWS region (default: "us-east-1")
61+
s3_endpoint_url: Custom S3 endpoint URL for non-AWS S3-compatible storage
62+
(e.g., MinIO, Wasabi, DigitalOcean Spaces)
5563
hive_columns: List of columns to use for Hive partitioning. Include 'year', 'month',
5664
'day', 'hour' to extract these from timestamp_column
5765
timestamp_column: Column containing timestamp to extract time partitions from
@@ -62,10 +70,10 @@ def __init__(
6270
auto_create_bucket: if True, create bucket in S3 if missing.
6371
max_workers: Maximum number of parallel upload threads (default: 10)
6472
"""
65-
self._aws_region = os.getenv('AWS_REGION', 'us-east-1')
66-
self._aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
67-
self._aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
68-
self._aws_endpoint_url = os.getenv("AWS_ENDPOINT_URL", None)
73+
self._aws_region = aws_region
74+
self._aws_access_key_id = aws_access_key_id
75+
self._aws_secret_access_key = aws_secret_access_key
76+
self._aws_endpoint_url = s3_endpoint_url
6977
self._credentials = {
7078
"region_name": self._aws_region,
7179
"aws_access_key_id": self._aws_access_key_id,
@@ -100,6 +108,9 @@ def setup(self):
100108
logger.info(f"S3 Target: s3://{self.s3_bucket}/{self.s3_prefix}/{self.table_name}")
101109
logger.info(f"Partitioning: hive_columns={self.hive_columns}")
102110

111+
if self._aws_endpoint_url:
112+
logger.info(f"Using custom S3 endpoint: {self._aws_endpoint_url}")
113+
103114
if self._catalog and self.auto_discover:
104115
logger.info(f"Table will be auto-registered in REST Catalog on first write")
105116

0 commit comments

Comments
 (0)