From fec33aeb199f16fbcb92cd63333a44513e4602e8 Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Fri, 21 Nov 2025 19:54:42 +0100 Subject: [PATCH 1/2] Adds a tested transformation template Adds a transformation template with separated pipeline logic and a mocked data source for integration testing, including a Docker Compose file for running a local Kafka broker. This template demonstrates best practices for testing Quix Streams applications, focusing on separation of concerns, deterministic assertions, and key testing considerations like windowing and partitioning. --- .../integration_tests_example/README.md | 157 ++++++++++++++++++ .../integration_tests_example/app.yaml | 15 ++ .../compose.test.yaml | 32 ++++ .../integration_tests_example/dockerfile | 28 ++++ .../integration_tests_example/library.json | 34 ++++ .../integration_tests_example/main.py | 55 ++++++ .../requirements.txt | 2 + .../integration_tests_example/test.py | 77 +++++++++ .../integration_tests_example/utils.py | 62 +++++++ 9 files changed, 462 insertions(+) create mode 100644 python/transformations/integration_tests_example/README.md create mode 100644 python/transformations/integration_tests_example/app.yaml create mode 100644 python/transformations/integration_tests_example/compose.test.yaml create mode 100644 python/transformations/integration_tests_example/dockerfile create mode 100644 python/transformations/integration_tests_example/library.json create mode 100644 python/transformations/integration_tests_example/main.py create mode 100644 python/transformations/integration_tests_example/requirements.txt create mode 100644 python/transformations/integration_tests_example/test.py create mode 100644 python/transformations/integration_tests_example/utils.py diff --git a/python/transformations/integration_tests_example/README.md b/python/transformations/integration_tests_example/README.md new file mode 100644 index 000000000..e5bbc1194 --- /dev/null +++ b/python/transformations/integration_tests_example/README.md @@ -0,0 +1,157 @@ +# Tested Transformation Template + +This template demonstrates how to build and test a Quix Streams transformation application with proper separation of concerns and comprehensive testing. + +## Project Structure + +``` +tested-transformation/ +├── main.py # Production code with pipeline definition +├── test.py # Test suite with mocked data source +├── utils.py # Shared testing utilities +└── README.md # This file +``` + +## Testing Strategy + +This template showcases best practices for testing Quix Streams applications: + +### 1. Separation of Pipeline Logic + +The pipeline transformation logic is isolated in the `define_pipeline()` function in `main.py`. This function: +- Takes a `StreamingDataFrame` as input +- Applies all transformations +- Returns the transformed `StreamingDataFrame` + +This separation allows the same pipeline logic to be used in both production (with Kafka topics) and testing (with mocked data sources). + +### 2. Mocked Data Source + +`test.py` includes a `TestDataSource` class that: +- Extends `quixstreams.sources.Source` +- Produces deterministic test data with known timestamps +- Includes dummy messages at the end to close windowed aggregations +- Eliminates the need to publish test data to Kafka topics manually + +**Note:** While the data source is mocked, these are still **integration tests** - Quix Streams internally uses Kafka for state management and processing. A Kafka broker is required to run the tests. + +### 3. Deterministic Assertions + +The test suite uses a structured approach: +- **Expected output is defined upfront** as a list of dictionaries with expected values +- **Messages are grouped by key** to handle partition-based ordering (messages from different keys may arrive in any order) +- **Order is verified within each key** to ensure temporal correctness per partition +- **Utility function handles comparison** via `assert_messages_match()` in `utils.py` + +### 4. Key Testing Considerations + +**Windowing:** When testing windowed operations, remember: +- Windows only emit results when they close +- Add dummy messages with future timestamps to close the final window +- Window boundaries align to fixed time intervals from epoch, not event arrival times + +**Partitioning:** In Kafka/Quix Streams: +- Messages with different keys may be processed in different partitions +- Order is guaranteed only within a single key/partition +- Tests must group by key before asserting order + +**Unique Consumer Groups:** Use `uuid.uuid4()` for consumer groups and source names in tests to ensure each test run is isolated. + +**Kafka Requirement:** Quix Streams uses Kafka internally for state management (RocksDB state stores are backed by changelog topics). The provided `compose.test.yaml` file starts a local Kafka broker and Redpanda Console for debugging: +- Kafka broker: `localhost:19092` +- Redpanda Console: `http://localhost:8080` (optional web UI for inspecting topics) + +## Running Tests + +### Prerequisites + +Tests require a running Kafka broker. Use the provided Docker Compose file to start Kafka locally: + +```bash +# Start Kafka in the background +docker-compose -f compose.test.yaml up -d + +# Wait a few seconds for Kafka to be ready +``` + +### Run the Tests + +```bash +# Run the test suite +python test.py +``` + +### Cleanup + +```bash +# Stop Kafka when done +docker-compose -f compose.test.yaml down -v +``` + +Expected output: +``` +✓ Received 7 messages + +--- Verifying messages for key: host1 --- +Message 1 for key 'host1': + Expected: start=1577836800000, end=1577836803000, value=1 + Actual: start=1577836800000, end=1577836803000, value=1 + ✓ Match! +... +✓ All assertions passed! +✓ All 7 messages matched expected output across 2 keys +``` + +## Running in Production + +```bash +# Run the production application +python main.py +``` + +The production application requires environment variables: +- `input`: Name of the input Kafka topic +- `output`: Name of the output Kafka topic + +## How to Use This Template + +1. **Modify the pipeline**: Update `define_pipeline()` in `main.py` with your transformation logic +2. **Update test data**: Modify `TestDataSource.memory_allocation_data` to match your input schema +3. **Update expected output**: Change `expected_rows` in `test.py` to match your pipeline's expected results +4. **Run tests**: Execute `python test.py` to verify your pipeline works correctly +5. **Deploy**: Deploy `main.py` to production + +## Example: Understanding the Test + +The template includes a tumbling window aggregation that counts events per 3-second window: + +```python +sdf = sdf.tumbling_window(3000).count().final() +``` + +**Input events** (7 events across 2 hosts): +- host1: timestamps 800, 803, 806, 810 (ms) +- host2: timestamps 801, 804, 808 (ms) + +**Window boundaries** (3000ms windows from epoch): +- [800, 803): host1=1, host2=1 +- [803, 806): host1=1, host2=1 +- [806, 809): host1=1, host2=1 +- [809, 812): host1=1 + +**Output**: 7 windowed count results (one per host per window) + +## Best Practices + +1. **Keep pipeline logic pure**: The `define_pipeline()` function should only transform data, not handle I/O +2. **Use realistic test data**: Mirror production data structures and edge cases +3. **Test temporal operations**: Pay special attention to windowing, time-based joins, and stateful operations +4. **Isolate test runs**: Use unique consumer groups to prevent state pollution between test runs +5. **Document window mechanics**: Clearly explain how windowing aligns and when windows close + +## Learn More + +- [Quix Streams Documentation](https://quix.io/docs/quix-streams/introduction.html) +- [StreamingDataFrame Operations](https://quix.io/docs/quix-streams/processing.html) +- [Windowing Guide](https://quix.io/docs/quix-streams/windowing.html) +- [Testing Stateful Applications](https://quix.io/docs/quix-streams/testing.html) \ No newline at end of file diff --git a/python/transformations/integration_tests_example/app.yaml b/python/transformations/integration_tests_example/app.yaml new file mode 100644 index 000000000..d018e8947 --- /dev/null +++ b/python/transformations/integration_tests_example/app.yaml @@ -0,0 +1,15 @@ +name: Tested transformation +language: python +variables: + - name: input + inputType: InputTopic + description: Name of the input topic to listen to. + defaultValue: raw + - name: output + inputType: OutputTopic + description: Name of the output topic to write to. + defaultValue: transform +dockerfile: dockerfile +runEntryPoint: main.py +defaultFile: main.py +libraryItemId: starter-transformation diff --git a/python/transformations/integration_tests_example/compose.test.yaml b/python/transformations/integration_tests_example/compose.test.yaml new file mode 100644 index 000000000..3c236480e --- /dev/null +++ b/python/transformations/integration_tests_example/compose.test.yaml @@ -0,0 +1,32 @@ +services: + kafka_broker: + image: docker.redpanda.com/redpandadata/redpanda:v24.1.1 + command: | + redpanda start + --smp 1 + --overprovisioned + --node-id 0 + --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 + --advertise-kafka-addr internal://kafka_broker:9092,external://localhost:19092 + --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082 + --advertise-pandaproxy-addr internal://kafka_broker:8082,external://localhost:18082 + --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081 + --rpc-addr kafka_broker:33145 + --advertise-rpc-addr kafka_broker:33145 + --mode dev-container + --set auto_create_topics_enabled=true + ports: + - 18081:18081 + - 18082:18082 + - 19092:19092 + - 19644:9644 + console: + image: docker.redpanda.com/redpandadata/console:v2.5.2 + entrypoint: /bin/sh + command: |- + -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console' + ports: + - 8080:8080 + environment: + CONFIG_FILEPATH: '/tmp/config.yml' + CONSOLE_CONFIG_FILE: "kafka:\n brokers: [\"kafka_broker:9092\"]\n schemaRegistry:\n enabled: true\n urls: [\"http://kafka_broker:8081\"]\nredpanda:\n adminApi:\n enabled: true\n urls: [\"http://kafka_broker:9644\"]\nconnect:\n enabled: true\n clusters:\n - name: local-connect-cluster\n url: http://connect:8083\n" diff --git a/python/transformations/integration_tests_example/dockerfile b/python/transformations/integration_tests_example/dockerfile new file mode 100644 index 000000000..692316bbb --- /dev/null +++ b/python/transformations/integration_tests_example/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/transformations/integration_tests_example/library.json b/python/transformations/integration_tests_example/library.json new file mode 100644 index 000000000..c9480988e --- /dev/null +++ b/python/transformations/integration_tests_example/library.json @@ -0,0 +1,34 @@ +{ + "libraryItemId": "tested-transformation", + "name": "Tested transformation template", + "language": "Python", + "IsHighlighted": false, + "DisplayOrder": 3, + "tags": { + "Complexity": ["Easy"], + "Technology": ["Quix Streams"], + "Pipeline Stage": ["Transformation"], + "Popular Subjects": ["Quick Start"], + "Type": ["Basic templates"] + }, + "shortDescription": "A testing template that applies a simple transformation and publishes the result to an output topic", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Name of the input topic to listen to.", + "DefaultValue": "csv-data" + }, + { + "Name": "output", + "Type": "EnvironmentVariable", + "InputType": "OutputTopic", + "Description": "Name of the output topic to write to.", + "DefaultValue": "transform" + } + ] +} diff --git a/python/transformations/integration_tests_example/main.py b/python/transformations/integration_tests_example/main.py new file mode 100644 index 000000000..ab7bdf5a8 --- /dev/null +++ b/python/transformations/integration_tests_example/main.py @@ -0,0 +1,55 @@ +# import the Quix Streams modules for interacting with Kafka. +# For general info, see https://quix.io/docs/quix-streams/introduction.html +from quixstreams import Application +from quixstreams.dataframe import StreamingDataFrame + +import os +from datetime import datetime + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + + +def define_pipeline(sdf: StreamingDataFrame): + + # Do StreamingDataFrame operations/transformations here + sdf = sdf.apply(lambda row: row).filter(lambda row: True) + + + sdf = sdf.set_timestamp(lambda row, *_: int(row["time"] / 1E6)) + + sdf["time"] = sdf["time"].apply(lambda epoch: str(datetime.fromtimestamp(epoch / 1E9))) + + sdf = sdf.tumbling_window(3000).count().final() + + #sdf = sdf.print(metadata=True) + + return sdf + + +def main(): + + # Setup necessary objects + app = Application( + consumer_group="my_transformation", + auto_create_topics=True, + auto_offset_reset="earliest" + ) + input_topic = app.topic(name=os.environ["input"]) + output_topic = app.topic(name=os.environ["output"]) + sdf = app.dataframe(topic=input_topic) + + # Apply the pipeline transformations + sdf = define_pipeline(sdf) + + # Finish off by writing to the final result to the output topic + sdf.to_topic(output_topic) + + # With our pipeline defined, now run the Application + app.run() + + +# It is recommended to execute Applications under a conditional main +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/transformations/integration_tests_example/requirements.txt b/python/transformations/integration_tests_example/requirements.txt new file mode 100644 index 000000000..1baef47cc --- /dev/null +++ b/python/transformations/integration_tests_example/requirements.txt @@ -0,0 +1,2 @@ +quixstreams==3.23.1 +python-dotenv \ No newline at end of file diff --git a/python/transformations/integration_tests_example/test.py b/python/transformations/integration_tests_example/test.py new file mode 100644 index 000000000..e4b1e4bf4 --- /dev/null +++ b/python/transformations/integration_tests_example/test.py @@ -0,0 +1,77 @@ +# Test file for the transformation pipeline +from quixstreams import Application +from quixstreams.sources import Source +import uuid + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + +from main import define_pipeline +from utils import assert_messages_match + + +class TestDataSource(Source): + + + memory_allocation_data = [ + {"m": "mem", "host": "host1", "used_percent": 64.56, "time": 1577836800000000000}, + {"m": "mem", "host": "host2", "used_percent": 71.89, "time": 1577836801000000000}, + {"m": "mem", "host": "host1", "used_percent": 63.27, "time": 1577836803000000000}, + {"m": "mem", "host": "host2", "used_percent": 73.45, "time": 1577836804000000000}, + {"m": "mem", "host": "host1", "used_percent": 62.98, "time": 1577836806000000000}, + {"m": "mem", "host": "host2", "used_percent": 74.33, "time": 1577836808000000000}, + {"m": "mem", "host": "host1", "used_percent": 65.21, "time": 1577836810000000000}, + # Dummy messages to close the last window (timestamp far in future) + {"m": "mem", "host": "host1", "used_percent": 0.0, "time": 1577836820000000000}, + {"m": "mem", "host": "host2", "used_percent": 0.0, "time": 1577836820000000000}, + ] + + def run(self): + data = iter(self.memory_allocation_data) + # either break when the app is stopped, or data is exhausted + while self.running: + try: + event = next(data) + event_serialized = self.serialize(key=event["host"], value=event) + self.produce(key=event_serialized.key, value=event_serialized.value) + except StopIteration: + print("Source finished producing messages.") + return + + +def test(): + # Expected output: windowed count results based on input data + # Input data has timestamps (in ms): 1577836800000, 1577836801000, 1577836803000, + # 1577836804000, 1577836806000, 1577836808000, 1577836810000 + # With 3000ms (3 second) tumbling windows starting at epoch 0, windows are: + # [1577836800000, 1577836803000), [1577836803000, 1577836806000), + # [1577836806000, 1577836809000), [1577836809000, 1577836812000), ... + expected_rows = [ + {"_key": "host1", "start": 1577836800000, "end": 1577836803000, "value": 1}, # Window 1: host1 event at 800 + {"_key": "host2", "start": 1577836800000, "end": 1577836803000, "value": 1}, # Window 1: host2 event at 801 + {"_key": "host1", "start": 1577836803000, "end": 1577836806000, "value": 1}, # Window 2: host1 event at 803 + {"_key": "host2", "start": 1577836803000, "end": 1577836806000, "value": 1}, # Window 2: host2 event at 804 + {"_key": "host1", "start": 1577836806000, "end": 1577836809000, "value": 1}, # Window 3: host1 event at 806 + {"_key": "host2", "start": 1577836806000, "end": 1577836809000, "value": 1}, # Window 3: host2 event at 808 + {"_key": "host1", "start": 1577836809000, "end": 1577836812000, "value": 1}, # Window 4: host1 event at 810 + ] + + app = Application(consumer_group=str(uuid.uuid4())) + + # Here instead of topic, we connect mocked data source. We give it random name + # so it will always create new topic that is empty. + sdf = app.dataframe(source=TestDataSource(str(uuid.uuid4()))) + + # Load the pipeline definition from main file. + sdf = define_pipeline(sdf) + + # Run the application (count=8 includes the dummy message that closes the last window) + result = app.run(timeout=5, count=8, metadata=True) + + # Assert messages match expected output + assert_messages_match(result, expected_rows) + + +if __name__ == "__main__": + test() diff --git a/python/transformations/integration_tests_example/utils.py b/python/transformations/integration_tests_example/utils.py new file mode 100644 index 000000000..5a7a06a62 --- /dev/null +++ b/python/transformations/integration_tests_example/utils.py @@ -0,0 +1,62 @@ + +def assert_messages_match(actual_messages, expected_messages): + """ + Assert that actual messages match expected messages. + Messages are grouped by key and order is verified within each key. + + Args: + actual_messages: List of messages from Application.run() + expected_messages: List of expected message dicts with '_key', 'start', 'end', 'value' + """ + # Assert we got the expected number of messages + assert len(actual_messages) == len(expected_messages), \ + f"Expected {len(expected_messages)} messages, got {len(actual_messages)}" + + print(f"\n✓ Received {len(actual_messages)} messages") + + # Group actual messages by key + actual_by_key = {} + for msg in actual_messages: + key = msg["_key"].decode('utf-8') if isinstance(msg["_key"], bytes) else msg["_key"] + if key not in actual_by_key: + actual_by_key[key] = [] + actual_by_key[key].append(msg) + + # Group expected messages by key + expected_by_key = {} + for msg in expected_messages: + key = msg["_key"] + if key not in expected_by_key: + expected_by_key[key] = [] + expected_by_key[key].append(msg) + + # Assert same keys exist + assert set(actual_by_key.keys()) == set(expected_by_key.keys()), \ + f"Key mismatch: expected keys {set(expected_by_key.keys())}, got {set(actual_by_key.keys())}" + + # Verify messages for each key in order + for key in sorted(expected_by_key.keys()): + actual_msgs = actual_by_key[key] + expected_msgs = expected_by_key[key] + + print(f"\n--- Verifying messages for key: {key} ---") + + assert len(actual_msgs) == len(expected_msgs), \ + f"Key '{key}': expected {len(expected_msgs)} messages, got {len(actual_msgs)}" + + for i, (actual, expected) in enumerate(zip(actual_msgs, expected_msgs)): + print(f"\nMessage {i + 1} for key '{key}':") + print(f" Expected: start={expected['start']}, end={expected['end']}, value={expected['value']}") + print(f" Actual: start={actual['start']}, end={actual['end']}, value={actual['value']}") + + assert actual["start"] == expected["start"], \ + f"Key '{key}' message {i+1}: start mismatch - expected {expected['start']}, got {actual['start']}" + assert actual["end"] == expected["end"], \ + f"Key '{key}' message {i+1}: end mismatch - expected {expected['end']}, got {actual['end']}" + assert actual["value"] == expected["value"], \ + f"Key '{key}' message {i+1}: value mismatch - expected {expected['value']}, got {actual['value']}" + + print(f" ✓ Match!") + + print(f"\n✓ All assertions passed!") + print(f"✓ All {len(expected_messages)} messages matched expected output across {len(expected_by_key)} keys") From 8f0ff6ffd4a31b6277a7581b5832c149aab9a943 Mon Sep 17 00:00:00 2001 From: Tomas Neubauer Date: Fri, 21 Nov 2025 19:56:20 +0100 Subject: [PATCH 2/2] Renames example transformation to tested Renames the integration tests example transformation to 'tested-transformation' to prepare it for template use. Adds timestamp setting and tumbling window aggregation. Adds an option for debugging print statements. --- .../README.md | 0 .../app.yaml | 0 .../compose.test.yaml | 0 .../dockerfile | 0 .../library.json | 0 .../main.py | 5 +++-- .../requirements.txt | 0 .../test.py | 0 .../utils.py | 0 9 files changed, 3 insertions(+), 2 deletions(-) rename python/transformations/{integration_tests_example => tested-transformation}/README.md (100%) rename python/transformations/{integration_tests_example => tested-transformation}/app.yaml (100%) rename python/transformations/{integration_tests_example => tested-transformation}/compose.test.yaml (100%) rename python/transformations/{integration_tests_example => tested-transformation}/dockerfile (100%) rename python/transformations/{integration_tests_example => tested-transformation}/library.json (100%) rename python/transformations/{integration_tests_example => tested-transformation}/main.py (94%) rename python/transformations/{integration_tests_example => tested-transformation}/requirements.txt (100%) rename python/transformations/{integration_tests_example => tested-transformation}/test.py (100%) rename python/transformations/{integration_tests_example => tested-transformation}/utils.py (100%) diff --git a/python/transformations/integration_tests_example/README.md b/python/transformations/tested-transformation/README.md similarity index 100% rename from python/transformations/integration_tests_example/README.md rename to python/transformations/tested-transformation/README.md diff --git a/python/transformations/integration_tests_example/app.yaml b/python/transformations/tested-transformation/app.yaml similarity index 100% rename from python/transformations/integration_tests_example/app.yaml rename to python/transformations/tested-transformation/app.yaml diff --git a/python/transformations/integration_tests_example/compose.test.yaml b/python/transformations/tested-transformation/compose.test.yaml similarity index 100% rename from python/transformations/integration_tests_example/compose.test.yaml rename to python/transformations/tested-transformation/compose.test.yaml diff --git a/python/transformations/integration_tests_example/dockerfile b/python/transformations/tested-transformation/dockerfile similarity index 100% rename from python/transformations/integration_tests_example/dockerfile rename to python/transformations/tested-transformation/dockerfile diff --git a/python/transformations/integration_tests_example/library.json b/python/transformations/tested-transformation/library.json similarity index 100% rename from python/transformations/integration_tests_example/library.json rename to python/transformations/tested-transformation/library.json diff --git a/python/transformations/integration_tests_example/main.py b/python/transformations/tested-transformation/main.py similarity index 94% rename from python/transformations/integration_tests_example/main.py rename to python/transformations/tested-transformation/main.py index ab7bdf5a8..72ce776c7 100644 --- a/python/transformations/integration_tests_example/main.py +++ b/python/transformations/tested-transformation/main.py @@ -15,14 +15,15 @@ def define_pipeline(sdf: StreamingDataFrame): # Do StreamingDataFrame operations/transformations here sdf = sdf.apply(lambda row: row).filter(lambda row: True) - - + + # Set row timestamp from payload. sdf = sdf.set_timestamp(lambda row, *_: int(row["time"] / 1E6)) sdf["time"] = sdf["time"].apply(lambda epoch: str(datetime.fromtimestamp(epoch / 1E9))) sdf = sdf.tumbling_window(3000).count().final() + # Optional printing for debugging. #sdf = sdf.print(metadata=True) return sdf diff --git a/python/transformations/integration_tests_example/requirements.txt b/python/transformations/tested-transformation/requirements.txt similarity index 100% rename from python/transformations/integration_tests_example/requirements.txt rename to python/transformations/tested-transformation/requirements.txt diff --git a/python/transformations/integration_tests_example/test.py b/python/transformations/tested-transformation/test.py similarity index 100% rename from python/transformations/integration_tests_example/test.py rename to python/transformations/tested-transformation/test.py diff --git a/python/transformations/integration_tests_example/utils.py b/python/transformations/tested-transformation/utils.py similarity index 100% rename from python/transformations/integration_tests_example/utils.py rename to python/transformations/tested-transformation/utils.py