Skip to content

Commit 56bd4c2

Browse files
authored
Adds Quixlake Timeseries destination connector (#657)
* Adds Quixlake Timeseries destination connector This commit introduces a new destination connector that writes time-series data from Kafka to S3 as Hive-partitioned Parquet files. Key features: - Supports Hive partitioning by any column, including time-based partitioning from timestamp columns. - Offers optional integration with a REST Catalog for table registration. - Includes configurable batch sizes and parallel uploads for optimal performance. - Validates partition strategies against existing tables to prevent data corruption. * Renames library item ID Updates the library item ID to be more descriptive of the destination. * Improves Quixlake TS and S3 destination tests Refactors test configurations for Quixlake Timeseries and S3 File destinations. Updates test parameters such as batch sizes, commit intervals, worker counts, and message counts to optimize test execution time and reliability. Adds `mypy-boto3-s3` dependency to s3-file destination. Renames "Quix TS Datalake Sink" to "Quix DataLake Timeseries Sink" for clarity. * Adds S3-compatible storage support Enables the connector to support S3-compatible storage services like MinIO, Wasabi, DigitalOcean Spaces, and Backblaze B2, by allowing users to set a custom endpoint URL. Also updates the Quix platform link. * Enables support for S3-compatible storage Allows the connector to use non-AWS S3-compatible storage services like MinIO, Wasabi, DigitalOcean Spaces, and Backblaze B2 by providing a custom endpoint URL. This change makes the connector more versatile and allows users to leverage alternative storage solutions.
1 parent 58905ba commit 56bd4c2

File tree

18 files changed

+2165
-5
lines changed

18 files changed

+2165
-5
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Quix TS Datalake Sink
2+
3+
This connector consumes time-series data from a Kafka topic and writes it to S3 as Hive-partitioned Parquet files, with optional Quix catalog registration for data lake query API.
4+
5+
## Features
6+
7+
- **Hive Partitioning**: Automatically partition data by any columns (e.g., location, sensor type, year/month/day/hour)
8+
- **Time-based Partitioning**: Extract year/month/day/hour from timestamp columns for efficient time-based queries
9+
- **Quix Catalog Integration**: Optional table registration in a REST Catalog for seamless integration with analytics tools
10+
- **Efficient Batching**: Configurable batch sizes and parallel S3 uploads for high throughput
11+
- **Schema Evolution**: Automatic schema detection from data
12+
- **Partition Validation**: Prevents data corruption by validating partition strategies against existing tables
13+
14+
## How to run
15+
16+
Create a [Quix](https://portal.cloud.quix.io/signup?xlink=github) account or log in and visit the `Connectors` tab to use this connector.
17+
18+
Clicking `Set up connector` allows you to enter your connection details and runtime parameters.
19+
20+
Then either:
21+
* Click `Test connection & deploy` to deploy the pre-built and configured container into Quix
22+
* Or click `Customise connector` to inspect or alter the code before deployment
23+
24+
## Environment Variables
25+
26+
### Required
27+
28+
- **`input`**: Name of the Kafka input topic to consume from
29+
*Default*: `sensor-data`
30+
31+
- **`S3_BUCKET`**: S3 bucket name for storing Parquet files
32+
33+
### S3 Configuration
34+
35+
- **`S3_PREFIX`**: S3 prefix/path for data files
36+
*Default*: `data`
37+
38+
- **`AWS_ACCESS_KEY_ID`**: AWS Access Key ID for S3 access
39+
*Default*: `""` (uses IAM role if empty)
40+
41+
- **`AWS_SECRET_ACCESS_KEY`**: AWS Secret Access Key for S3 access
42+
*Default*: `""` (uses IAM role if empty)
43+
44+
- **`AWS_REGION`**: AWS region for S3 bucket
45+
*Default*: `us-east-1`
46+
47+
- **`AWS_ENDPOINT_URL`**: Custom S3 endpoint URL for non-AWS S3-compatible storage
48+
*Examples*:
49+
- MinIO: `http://minio.example.com:9000`
50+
- Wasabi: `https://s3.wasabisys.com`
51+
- DigitalOcean Spaces: `https://nyc3.digitaloceanspaces.com`
52+
- Backblaze B2: `https://s3.us-west-004.backblazeb2.com`
53+
*Default*: None (uses AWS S3)
54+
55+
### Data Organization
56+
57+
- **`TABLE_NAME`**: Table name for data organization and registration
58+
*Default*: Uses the topic name if not specified
59+
60+
- **`HIVE_COLUMNS`**: Comma-separated list of columns for Hive partitioning. Include `year`, `month`, `day`, `hour` to extract from `TIMESTAMP_COLUMN`
61+
*Example*: `location,year,month,day,sensor_type`
62+
*Default*: `""` (no partitioning)
63+
64+
- **`TIMESTAMP_COLUMN`**: Column containing timestamp values to extract year/month/day/hour from
65+
*Default*: `ts_ms`
66+
67+
### Catalog Integration (Optional)
68+
69+
- **`CATALOG_URL`**: REST Catalog URL for optional table registration (leave empty to skip)
70+
*Example*: `https://catalog.example.com/api/v1`
71+
72+
- **`CATALOG_AUTH_TOKEN`**: If using a catalog, the respective auth token to access it
73+
74+
- **`AUTO_DISCOVER`**: Automatically register table in REST Catalog on first write
75+
*Default*: `true`
76+
77+
- **`CATALOG_NAMESPACE`**: Catalog namespace for table registration
78+
*Default*: `default`
79+
80+
### Kafka Configuration
81+
82+
- **`CONSUMER_GROUP`**: Kafka consumer group name
83+
*Default*: `s3_direct_sink_v1.0`
84+
85+
- **`AUTO_OFFSET_RESET`**: Where to start consuming if no offset exists
86+
*Default*: `latest`
87+
*Options*: `earliest`, `latest`
88+
89+
- **`KAFKA_KEY_DESERIALIZER`**: The key deserializer to use
90+
*Default*: `str`
91+
92+
- **`KAFKA_VALUE_DESERIALIZER`**: The value deserializer to use
93+
*Default*: `json`
94+
95+
### Performance Tuning
96+
97+
- **`BATCH_SIZE`**: Number of messages to batch before writing to S3
98+
*Default*: `1000`
99+
100+
- **`COMMIT_INTERVAL`**: Kafka commit interval in seconds
101+
*Default*: `30`
102+
103+
- **`MAX_WRITE_WORKERS`**: How many files can be written in parallel to S3 at once
104+
*Default*: `10`
105+
106+
### Application Settings
107+
108+
- **`LOGLEVEL`**: Set application logging level
109+
*Default*: `INFO`
110+
*Options*: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`
111+
112+
## Partitioning Strategy Examples
113+
114+
### Example 1: Time-based partitioning
115+
```bash
116+
HIVE_COLUMNS=year,month,day
117+
TIMESTAMP_COLUMN=ts_ms
118+
```
119+
Creates: `s3://bucket/prefix/table/year=2024/month=01/day=15/data_*.parquet`
120+
121+
### Example 2: Multi-dimensional partitioning
122+
```bash
123+
HIVE_COLUMNS=location,sensor_type,year,month
124+
TIMESTAMP_COLUMN=timestamp
125+
```
126+
Creates: `s3://bucket/prefix/table/location=NYC/sensor_type=temp/year=2024/month=01/data_*.parquet`
127+
128+
### Example 3: No partitioning
129+
```bash
130+
HIVE_COLUMNS=
131+
```
132+
Creates: `s3://bucket/prefix/table/data_*.parquet`
133+
134+
## Using Non-AWS S3-Compatible Storage
135+
136+
This connector supports any S3-compatible storage service by setting the `AWS_ENDPOINT_URL` environment variable.
137+
138+
### MinIO Example
139+
```bash
140+
AWS_ENDPOINT_URL=http://minio.example.com:9000
141+
AWS_ACCESS_KEY_ID=minioadmin
142+
AWS_SECRET_ACCESS_KEY=minioadmin
143+
AWS_REGION=us-east-1
144+
S3_BUCKET=my-data-lake
145+
```
146+
147+
### Wasabi Example
148+
```bash
149+
AWS_ENDPOINT_URL=https://s3.wasabisys.com
150+
AWS_ACCESS_KEY_ID=your-wasabi-access-key
151+
AWS_SECRET_ACCESS_KEY=your-wasabi-secret-key
152+
AWS_REGION=us-east-1
153+
S3_BUCKET=my-data-lake
154+
```
155+
156+
### DigitalOcean Spaces Example
157+
```bash
158+
AWS_ENDPOINT_URL=https://nyc3.digitaloceanspaces.com
159+
AWS_ACCESS_KEY_ID=your-spaces-access-key
160+
AWS_SECRET_ACCESS_KEY=your-spaces-secret-key
161+
AWS_REGION=nyc3
162+
S3_BUCKET=my-data-lake
163+
```
164+
165+
### Backblaze B2 Example
166+
```bash
167+
AWS_ENDPOINT_URL=https://s3.us-west-004.backblazeb2.com
168+
AWS_ACCESS_KEY_ID=your-b2-key-id
169+
AWS_SECRET_ACCESS_KEY=your-b2-application-key
170+
AWS_REGION=us-west-004
171+
S3_BUCKET=my-data-lake
172+
```
173+
174+
## Architecture
175+
176+
The sink uses a batching architecture for high throughput:
177+
178+
1. **Consume**: Messages are consumed from Kafka in batches
179+
2. **Transform**: Time-based columns are extracted if needed
180+
3. **Partition**: Data is grouped by partition columns
181+
4. **Upload**: Multiple files are uploaded to S3 in parallel
182+
5. **Register**: Files are registered in the catalog (if configured)
183+
184+
## Requirements
185+
186+
- S3 bucket access:
187+
- AWS S3, or
188+
- Any S3-compatible storage (MinIO, Wasabi, DigitalOcean Spaces, Backblaze B2, etc.)
189+
- Optional: Quix REST Catalog endpoint for data catalog integration
190+
191+
## Contribute
192+
193+
Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
194+
195+
## Open Source
196+
197+
This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import requests
2+
from typing import Optional
3+
4+
5+
class CatalogClient:
6+
"""Simple HTTP client for REST Catalog API interactions"""
7+
8+
def __init__(self, base_url: str, auth_token: Optional[str] = None):
9+
"""
10+
Initialize the catalog client
11+
12+
Args:
13+
base_url: Base URL of the REST Catalog API
14+
auth_token: Optional authentication token for API requests
15+
"""
16+
self.base_url = base_url.rstrip('/')
17+
self.auth_token = auth_token
18+
self._session = requests.Session()
19+
20+
# Set up authentication header if token is provided
21+
if self.auth_token:
22+
self._session.headers['Authorization'] = f'Bearer {self.auth_token}'
23+
24+
def get(self, path: str, timeout: int = 30) -> requests.Response:
25+
"""
26+
Make a GET request to the catalog API
27+
28+
Args:
29+
path: API endpoint path (will be appended to base_url)
30+
timeout: Request timeout in seconds
31+
32+
Returns:
33+
Response object from requests library
34+
"""
35+
url = f"{self.base_url}{path}"
36+
return self._session.get(url, timeout=timeout)
37+
38+
def post(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response:
39+
"""
40+
Make a POST request to the catalog API
41+
42+
Args:
43+
path: API endpoint path (will be appended to base_url)
44+
json: JSON payload to send in request body
45+
timeout: Request timeout in seconds
46+
47+
Returns:
48+
Response object from requests library
49+
"""
50+
url = f"{self.base_url}{path}"
51+
return self._session.post(url, json=json, timeout=timeout)
52+
53+
def put(self, path: str, json: dict = None, timeout: int = 30) -> requests.Response:
54+
"""
55+
Make a PUT request to the catalog API
56+
57+
Args:
58+
path: API endpoint path (will be appended to base_url)
59+
json: JSON payload to send in request body
60+
timeout: Request timeout in seconds
61+
62+
Returns:
63+
Response object from requests library
64+
"""
65+
url = f"{self.base_url}{path}"
66+
return self._session.put(url, json=json, timeout=timeout)
67+
68+
def __str__(self):
69+
"""String representation showing the base URL"""
70+
return self.base_url
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
FROM python:3.11.1-slim-buster
2+
3+
# Set environment variables to non-interactive and unbuffered output
4+
ENV DEBIAN_FRONTEND=noninteractive \
5+
PYTHONUNBUFFERED=1 \
6+
PYTHONIOENCODING=UTF-8
7+
8+
# Set the working directory inside the container
9+
WORKDIR /app
10+
11+
# Copy only the requirements file(s) to leverage Docker cache
12+
# Assuming all requirements files are in the root or subdirectories
13+
COPY ./requirements.txt ./
14+
15+
# Install dependencies
16+
# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size
17+
RUN pip install --no-cache-dir -r requirements.txt
18+
19+
# Copy the rest of the application
20+
COPY . .
21+
22+
# Set the command to run your application
23+
ENTRYPOINT ["python3", "main.py"]
4.74 KB
Loading

0 commit comments

Comments
 (0)