Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ pipeline {
}

stages {

stage('Prepare Environment') {
when {
anyOf {
allOf {
branch 'master'
triggeredBy 'UserIdCause' // Manual trigger on master
}
allOf {
branch 'latest'
}
}
}
steps {
script {
sh 'git config --global --add safe.directory $(pwd)'
Expand All @@ -41,8 +53,11 @@ pipeline {

stage('Beta Release') {
when {
branch 'master'
}
allOf {
branch 'master'
triggeredBy 'UserIdCause' // Manual "Build Now"
}
}
steps {
sh '''
sed -i -E 's/^(name *= *")superstream-clients(")/\\1superstream-clients-beta\\2/' pyproject.toml
Expand Down Expand Up @@ -106,7 +121,7 @@ pipeline {
}
withCredentials([string(credentialsId: 'gh_token', variable: 'GH_TOKEN')]) {
sh """
gh release create $versionTag dist/superstream_confluent_kafka-${env.versionTag}.tar.gz --generate-notes
gh release create $versionTag dist/superstream_clients-${env.versionTag}.tar.gz --generate-notes
"""
}
}
Expand Down
66 changes: 1 addition & 65 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ Superstream Clients works as a Python import hook that intercepts Kafka producer

## Supported Libraries

Works with any Python library that implements Kafka producers, including:

- kafka-python
- aiokafka
- confluent-kafka
Expand All @@ -31,51 +29,6 @@ Works with any Python library that implements Kafka producers, including:
- **Dynamic configuration**: Applies optimized settings based on topic-specific recommendations
- **Intelligent optimization**: Identifies the most impactful topics to optimize
- **Graceful fallback**: Falls back to default settings if optimization fails
- **Minimal overhead**: Uses a single lightweight background thread (or async coroutine for aiokafka)

## Important: Producer Configuration Requirements

When initializing your Kafka producers, please ensure you pass the configuration as a mutable object. The Superstream library needs to modify the producer configuration to apply optimizations. The following initialization patterns are supported:

✅ **Supported (Recommended)**:
```python
# Using kafka-python
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
compression_type='snappy',
batch_size=16384
)

# Using aiokafka
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
compression_type='snappy',
batch_size=16384
)

# Using confluent-kafka
from confluent_kafka import Producer
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'compression.type': 'snappy',
'batch.size': 16384
})
```

❌ **Not Supported**:
```python
# Using frozen dictionaries or immutable configurations
from types import MappingProxyType
config = MappingProxyType({
'bootstrap.servers': 'localhost:9092'
})
producer = KafkaProducer(**config)
```

### Why This Matters
The Superstream library needs to modify your producer's configuration to apply optimizations based on your cluster's characteristics. This includes adjusting settings like compression, batch size, and other performance parameters. When the configuration is immutable, these optimizations cannot be applied.

## Installation

Expand All @@ -87,24 +40,7 @@ That's it! Superclient will now automatically load and optimize all Kafka produc

## Usage

After installation, superclient works automatically. Just use your Kafka clients as usual:

```python
# kafka-python
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# Automatically optimized!

# confluent-kafka
from confluent_kafka import Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# Automatically optimized!

# aiokafka
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
# Automatically optimized!
```
After installation, superclient works automatically. Just use your Kafka clients as usual.

### Docker Integration

Expand Down
2 changes: 1 addition & 1 deletion examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
kafka-python==2.2.14
confluent-kafka==2.3.0
confluent-kafka==2.11.0
aiokafka==0.10.0
aws-msk-iam-sasl-signer-python==1.0.2
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "superstream-clients"
version = "0.1.5"
version = "1.0.0"
description = "Superstream optimisation library for Kafka producers"
authors = [{name = "Superstream Labs", email = "support@superstream.ai"}]
license = "Apache-2.0"
Expand Down
116 changes: 113 additions & 3 deletions superclient/agent/interceptor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""Producer interception functionality."""

import os
import uuid
from typing import Any, Dict
import importlib

from ..util.logger import get_logger
from ..util.config import get_topics_list, is_disabled
from ..util.metrics import configure_confluent_stats_callback
from .metadata import fetch_metadata_sync, optimal_cfg, _DEFAULTS
from ..core.reporter import send_clients_msg
from ..core.manager import normalize_bootstrap
Expand Down Expand Up @@ -243,8 +246,102 @@ async def stop_patch(*a, **kw):
await original_stop(*a, **kw)

self.stop = stop_patch

# Patch the SendProduceReqHandler.create_request method for this specific producer
sender_mod = importlib.import_module("aiokafka.producer.sender")

# Only patch once globally
if not hasattr(sender_mod.SendProduceReqHandler, '_superstream_patched'):
orig_create_request = sender_mod.SendProduceReqHandler.create_request

def create_request_with_metrics(self_handler):
# Call the original method to get the request, but collect metrics
# self_handler._batches: Dict[TopicPartition, MessageBatch]

# Quick check: if sender has no tracker, it's an internal producer - skip metrics
if not hasattr(self_handler._sender, '_superstream_tracker'):
return orig_create_request(self_handler)

tracker = self_handler._sender._superstream_tracker

# Additional check: skip internal producers by client_id
if tracker is None or tracker.client_id.startswith(_SUPERLIB_PREFIX):
return orig_create_request(self_handler)

# Per-producer totals
total_uncompressed = 0
total_compressed = 0
total_records = 0
topic_stats = {}
for tp, batch in self_handler._batches.items():
# Get record count from the batch
record_count = batch.record_count

# Get compressed size from the batch buffer
compressed = 0
try:
compressed = len(batch.get_data_buffer())
except Exception:
pass

# Estimate uncompressed size based on record count
# Since we can't easily access the original message data at this point,
# we'll use a reasonable estimate based on the batch size and record count
if record_count > 0:
# Estimate uncompressed size based on compressed size and typical compression ratios
# This is an approximation since we can't access the original message data
estimated_compression_ratio = 0.7 # Assume 30% compression
uncompressed = int(compressed / estimated_compression_ratio)
else:
uncompressed = 0

total_uncompressed += uncompressed
total_compressed += compressed
total_records += record_count
# Per-topic
if tp.topic not in topic_stats:
topic_stats[tp.topic] = {'uncompressed': 0, 'compressed': 0, 'records': 0}
topic_stats[tp.topic]['uncompressed'] += uncompressed
topic_stats[tp.topic]['compressed'] += compressed
topic_stats[tp.topic]['records'] += record_count
# Update tracker
if total_records > 0:
tracker._superstream_metrics = getattr(tracker, '_superstream_metrics', {})
m = tracker._superstream_metrics
# Accumulate totals (aggregative counters)
m['outgoing-byte-total'] = m.get('outgoing-byte-total', 0) + total_compressed
m['record-send-total'] = m.get('record-send-total', 0) + total_records
m['uncompressed-byte-total'] = m.get('uncompressed-byte-total', 0) + total_uncompressed

# Calculate rates from aggregated totals
m['compression-rate-avg'] = (m['outgoing-byte-total'] / m['uncompressed-byte-total']) if m['uncompressed-byte-total'] else 1.0
m['record-size-avg'] = (m['uncompressed-byte-total'] / m['record-send-total']) if m['record-send-total'] else 0

# Per-topic
m['topics'] = m.get('topics', {})
for topic, stats in topic_stats.items():
t = m['topics'].setdefault(topic, {'byte-total': 0, 'record-send-total': 0, 'uncompressed-total': 0})
# Accumulate totals (aggregative counters)
t['byte-total'] = t.get('byte-total', 0) + stats['compressed']
t['record-send-total'] = t.get('record-send-total', 0) + stats['records']
t['uncompressed-total'] = t.get('uncompressed-total', 0) + stats['uncompressed']

# Calculate compression rate from aggregated totals
t['compression-rate'] = (t['byte-total'] / t['uncompressed-total']) if t['uncompressed-total'] else 1.0

tracker._superstream_metrics = m
return orig_create_request(self_handler)

sender_mod.SendProduceReqHandler.create_request = create_request_with_metrics
sender_mod.SendProduceReqHandler._superstream_patched = True

self._superstream_patch = True
orig_init(self, *args, **kwargs)

# Store tracker reference in the sender for metrics collection
if hasattr(self, '_sender'):
self._sender._superstream_tracker = tr

send_clients_msg(tr, error_msg)

# Log success message based on whether defaults were used
Expand Down Expand Up @@ -345,10 +442,18 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs):
logger.debug("Overriding configuration: {} ((not set) -> {})", k, v)
conf[k] = v


# Generate UUID for this producer
tracker_uuid = str(uuid.uuid4())

# Configure stats callback for metrics collection
conf = configure_confluent_stats_callback(conf, tracker_uuid)

# Create the producer with optimized configuration
self._producer = Producer(conf, *args, **kwargs)

report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS
# Create tracker with the generated UUID
self._tracker = ProducerTracker(
lib="confluent",
producer=self._producer,
Expand All @@ -357,10 +462,12 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs):
orig_cfg=orig_cfg,
opt_cfg=opt_cfg,
report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS),
error=error_msg, # Store error message in tracker
error=error_msg,
metadata=metadata,
topics_env=topics_env,
uuid=tracker_uuid, # Use the generated UUID
)

Heartbeat.register_tracker(self._tracker)

send_clients_msg(self._tracker, error_msg)
Expand Down Expand Up @@ -397,12 +504,15 @@ def __del__(self):
self._superstream_closed = True
self._tracker.close()
Heartbeat.unregister_tracker(self._tracker.uuid)

# Remove metrics extractor from registry
from ..util.metrics import remove_producer_metrics_extractor
remove_producer_metrics_extractor(self._tracker.uuid)

logger.debug("Superstream tracking stopped for confluent-kafka producer with client_id: {}",
getattr(self._tracker, 'client_id', 'unknown'))
except Exception as e:
logger.error("Error during automatic cleanup: {}", e)
else:
logger.debug("Producer already cleaned up or no tracker found")

def __getattr__(self, name):
"""Delegate all other attributes to the underlying producer."""
Expand Down
Loading