Streaming Data Loader is a high-performance, async-powered microservice for processing data streams from Kafka and bulk-loading them into Elasticsearch. It combines modern Python tooling with observability best practices to provide reliable, scalable, and debuggable data pipelines.
It’s fast, resilient, and production-ready — ideal for those who need lightweight alternatives to complex ETL systems.
- Asynchronous processing with
asyncio
,aiokafka
, andaiohttp
- Batch insertions for throughput efficiency
- Retry & fault-tolerant logic for Kafka and Elasticsearch
- Configurable via
.env
andpydantic-settings
- Docker & Kubernetes ready
- Prometheus + Grafana monitoring included
- Tested with
pytest
, including integration scenarios
docker-compose up --build
- http://localhost:9090 → Prometheus
- http://localhost:3000 → Grafana (admin / admin)
- http://localhost:8080 → Kafka UI
./k8s-deploy.sh
./k8s-clean.sh
This project uses Hexagonal Architecture (Ports and Adapters), ensuring modularity, extensibility, and clean separation of concerns.
Kafka -→ KafkaConsumerService -→ EventService -→ ElasticsearchClientService -→ Elasticsearch
│ ↓
└-→ Metrics + Logging (Prometheus + JSON logs)
- Input Ports: Kafka Consumer (aiokafka), deserialization, batching
- Application Core: Event transformation, validation, retry logic
- Output Ports: Async Elasticsearch client, bulk insert, failure handling
- Infrastructure: Docker, Kubernetes, logging, metrics, monitoring
- ✅ True async data pipeline — lower latency, better throughput
- ✅ No heavyweight config DSL — Python code,
pyproject.toml
,.env
- ✅ Built-in retries & fault handling — robust out of the box
- ✅ JSON logging and metric labeling for full observability
- ✅ Open-source & customizable — perfect for modern data teams
Prometheus scrapes metrics on /metrics
(port 8000
). Dashboards are automatically provisioned in Grafana.
Metric | Description |
---|---|
messages_processed_total |
Total number of processed messages |
errors_total |
Total errors during processing |
consume_duration_seconds |
Time spent reading from Kafka |
response_duration_seconds |
Time to insert into Elasticsearch |
transform_duration_seconds |
Time spent transforming messages |
batch_processing_duration_seconds |
Full batch processing time |
pytest -v
Includes:
- ✅ Unit tests
- ✅ Integration tests (Kafka → ES)
- ✅ Metrics verification
- ✅ Config validation
Python 3.12
+asyncio
Kafka + aiokafka
Elasticsearch
Bulk API
Pydantic
dotenv
poetry
Prometheus
Grafana
Docker
docker-compose
Kubernetes-ready
- JSON logging (
python-json-logger
)
streaming-data-loader/
├── configs/ # Prometheus / Grafana
├── src/ # Main source code
│ ├── domain/
│ ├── ports/
│ ├── services/ # Event processing logic
│ ├── config.py # Settings & env config
│ ├── logger.py # JSON logger setup
│ └── metrics.py # Prometheus metrics
├── tests/ # Unit & integration tests
├── k8s/ # Kubernetes manifests
├── docker-compose.yml
├── Dockerfile
├── deploy.sh / clean.sh
└── pyproject.toml
...give it a star, fork it, or mention it in your next data project!
Anatoly Dudko
GitHub @aDudko • LinkedIn