Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions python/transformations/tested-transformation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Tested Transformation Template

This template demonstrates how to build and test a Quix Streams transformation application with proper separation of concerns and comprehensive testing.

## Project Structure

```
tested-transformation/
├── main.py # Production code with pipeline definition
├── test.py # Test suite with mocked data source
├── utils.py # Shared testing utilities
└── README.md # This file
```

## Testing Strategy

This template showcases best practices for testing Quix Streams applications:

### 1. Separation of Pipeline Logic

The pipeline transformation logic is isolated in the `define_pipeline()` function in `main.py`. This function:
- Takes a `StreamingDataFrame` as input
- Applies all transformations
- Returns the transformed `StreamingDataFrame`

This separation allows the same pipeline logic to be used in both production (with Kafka topics) and testing (with mocked data sources).

### 2. Mocked Data Source

`test.py` includes a `TestDataSource` class that:
- Extends `quixstreams.sources.Source`
- Produces deterministic test data with known timestamps
- Includes dummy messages at the end to close windowed aggregations
- Eliminates the need to publish test data to Kafka topics manually

**Note:** While the data source is mocked, these are still **integration tests** - Quix Streams internally uses Kafka for state management and processing. A Kafka broker is required to run the tests.

### 3. Deterministic Assertions

The test suite uses a structured approach:
- **Expected output is defined upfront** as a list of dictionaries with expected values
- **Messages are grouped by key** to handle partition-based ordering (messages from different keys may arrive in any order)
- **Order is verified within each key** to ensure temporal correctness per partition
- **Utility function handles comparison** via `assert_messages_match()` in `utils.py`

### 4. Key Testing Considerations

**Windowing:** When testing windowed operations, remember:
- Windows only emit results when they close
- Add dummy messages with future timestamps to close the final window
- Window boundaries align to fixed time intervals from epoch, not event arrival times

**Partitioning:** In Kafka/Quix Streams:
- Messages with different keys may be processed in different partitions
- Order is guaranteed only within a single key/partition
- Tests must group by key before asserting order

**Unique Consumer Groups:** Use `uuid.uuid4()` for consumer groups and source names in tests to ensure each test run is isolated.

**Kafka Requirement:** Quix Streams uses Kafka internally for state management (RocksDB state stores are backed by changelog topics). The provided `compose.test.yaml` file starts a local Kafka broker and Redpanda Console for debugging:
- Kafka broker: `localhost:19092`
- Redpanda Console: `http://localhost:8080` (optional web UI for inspecting topics)

## Running Tests

### Prerequisites

Tests require a running Kafka broker. Use the provided Docker Compose file to start Kafka locally:

```bash
# Start Kafka in the background
docker-compose -f compose.test.yaml up -d

# Wait a few seconds for Kafka to be ready
```

### Run the Tests

```bash
# Run the test suite
python test.py
```

### Cleanup

```bash
# Stop Kafka when done
docker-compose -f compose.test.yaml down -v
```

Expected output:
```
✓ Received 7 messages

--- Verifying messages for key: host1 ---
Message 1 for key 'host1':
Expected: start=1577836800000, end=1577836803000, value=1
Actual: start=1577836800000, end=1577836803000, value=1
✓ Match!
...
✓ All assertions passed!
✓ All 7 messages matched expected output across 2 keys
```

## Running in Production

```bash
# Run the production application
python main.py
```

The production application requires environment variables:
- `input`: Name of the input Kafka topic
- `output`: Name of the output Kafka topic

## How to Use This Template

1. **Modify the pipeline**: Update `define_pipeline()` in `main.py` with your transformation logic
2. **Update test data**: Modify `TestDataSource.memory_allocation_data` to match your input schema
3. **Update expected output**: Change `expected_rows` in `test.py` to match your pipeline's expected results
4. **Run tests**: Execute `python test.py` to verify your pipeline works correctly
5. **Deploy**: Deploy `main.py` to production

## Example: Understanding the Test

The template includes a tumbling window aggregation that counts events per 3-second window:

```python
sdf = sdf.tumbling_window(3000).count().final()
```

**Input events** (7 events across 2 hosts):
- host1: timestamps 800, 803, 806, 810 (ms)
- host2: timestamps 801, 804, 808 (ms)

**Window boundaries** (3000ms windows from epoch):
- [800, 803): host1=1, host2=1
- [803, 806): host1=1, host2=1
- [806, 809): host1=1, host2=1
- [809, 812): host1=1

**Output**: 7 windowed count results (one per host per window)

## Best Practices

1. **Keep pipeline logic pure**: The `define_pipeline()` function should only transform data, not handle I/O
2. **Use realistic test data**: Mirror production data structures and edge cases
3. **Test temporal operations**: Pay special attention to windowing, time-based joins, and stateful operations
4. **Isolate test runs**: Use unique consumer groups to prevent state pollution between test runs
5. **Document window mechanics**: Clearly explain how windowing aligns and when windows close

## Learn More

- [Quix Streams Documentation](https://quix.io/docs/quix-streams/introduction.html)
- [StreamingDataFrame Operations](https://quix.io/docs/quix-streams/processing.html)
- [Windowing Guide](https://quix.io/docs/quix-streams/windowing.html)
- [Testing Stateful Applications](https://quix.io/docs/quix-streams/testing.html)
15 changes: 15 additions & 0 deletions python/transformations/tested-transformation/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: Tested transformation
language: python
variables:
- name: input
inputType: InputTopic
description: Name of the input topic to listen to.
defaultValue: raw
- name: output
inputType: OutputTopic
description: Name of the output topic to write to.
defaultValue: transform
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: main.py
libraryItemId: starter-transformation
32 changes: 32 additions & 0 deletions python/transformations/tested-transformation/compose.test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
services:
kafka_broker:
image: docker.redpanda.com/redpandadata/redpanda:v24.1.1
command: |
redpanda start
--smp 1
--overprovisioned
--node-id 0
--kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
--advertise-kafka-addr internal://kafka_broker:9092,external://localhost:19092
--pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
--advertise-pandaproxy-addr internal://kafka_broker:8082,external://localhost:18082
--schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
--rpc-addr kafka_broker:33145
--advertise-rpc-addr kafka_broker:33145
--mode dev-container
--set auto_create_topics_enabled=true
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
console:
image: docker.redpanda.com/redpandadata/console:v2.5.2
entrypoint: /bin/sh
command: |-
-c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
ports:
- 8080:8080
environment:
CONFIG_FILEPATH: '/tmp/config.yml'
CONSOLE_CONFIG_FILE: "kafka:\n brokers: [\"kafka_broker:9092\"]\n schemaRegistry:\n enabled: true\n urls: [\"http://kafka_broker:8081\"]\nredpanda:\n adminApi:\n enabled: true\n urls: [\"http://kafka_broker:9644\"]\nconnect:\n enabled: true\n clusters:\n - name: local-connect-cluster\n url: http://connect:8083\n"
28 changes: 28 additions & 0 deletions python/transformations/tested-transformation/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
34 changes: 34 additions & 0 deletions python/transformations/tested-transformation/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"libraryItemId": "tested-transformation",
"name": "Tested transformation template",
"language": "Python",
"IsHighlighted": false,
"DisplayOrder": 3,
"tags": {
"Complexity": ["Easy"],
"Technology": ["Quix Streams"],
"Pipeline Stage": ["Transformation"],
"Popular Subjects": ["Quick Start"],
"Type": ["Basic templates"]
},
"shortDescription": "A testing template that applies a simple transformation and publishes the result to an output topic",
"DefaultFile": "main.py",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"Variables": [
{
"Name": "input",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "Name of the input topic to listen to.",
"DefaultValue": "csv-data"
},
{
"Name": "output",
"Type": "EnvironmentVariable",
"InputType": "OutputTopic",
"Description": "Name of the output topic to write to.",
"DefaultValue": "transform"
}
]
}
56 changes: 56 additions & 0 deletions python/transformations/tested-transformation/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# import the Quix Streams modules for interacting with Kafka.
# For general info, see https://quix.io/docs/quix-streams/introduction.html
from quixstreams import Application
from quixstreams.dataframe import StreamingDataFrame

import os
from datetime import datetime

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()


def define_pipeline(sdf: StreamingDataFrame):

# Do StreamingDataFrame operations/transformations here
sdf = sdf.apply(lambda row: row).filter(lambda row: True)

# Set row timestamp from payload.
sdf = sdf.set_timestamp(lambda row, *_: int(row["time"] / 1E6))

sdf["time"] = sdf["time"].apply(lambda epoch: str(datetime.fromtimestamp(epoch / 1E9)))

sdf = sdf.tumbling_window(3000).count().final()

# Optional printing for debugging.
#sdf = sdf.print(metadata=True)

return sdf


def main():

# Setup necessary objects
app = Application(
consumer_group="my_transformation",
auto_create_topics=True,
auto_offset_reset="earliest"
)
input_topic = app.topic(name=os.environ["input"])
output_topic = app.topic(name=os.environ["output"])
sdf = app.dataframe(topic=input_topic)

# Apply the pipeline transformations
sdf = define_pipeline(sdf)

# Finish off by writing to the final result to the output topic
sdf.to_topic(output_topic)

# With our pipeline defined, now run the Application
app.run()


# It is recommended to execute Applications under a conditional main
if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions python/transformations/tested-transformation/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams==3.23.1
python-dotenv
Loading