Skip to content

aDudko/streaming-data-loader

Repository files navigation

Streaming Data Loader

Python Kafka Elasticsearch Prometheus Grafana AsyncIO CI Docker Kubernetes License


Overview

Streaming Data Loader is a high-performance, async-powered microservice for processing data streams from Kafka and bulk-loading them into Elasticsearch. It combines modern Python tooling with observability best practices to provide reliable, scalable, and debuggable data pipelines.

It’s fast, resilient, and production-ready — ideal for those who need lightweight alternatives to complex ETL systems.


Key Features

  • Asynchronous processing with asyncio, aiokafka, and aiohttp
  • Batch insertions for throughput efficiency
  • Retry & fault-tolerant logic for Kafka and Elasticsearch
  • Configurable via .env and pydantic-settings
  • Docker & Kubernetes ready
  • Prometheus + Grafana monitoring included
  • Tested with pytest, including integration scenarios

Quick Start

🐳 Docker Compose

docker-compose up --build

☸️ Kubernetes

Step 1 — Deploy

./k8s-deploy.sh

Step 2 — Cleanup

./k8s-clean.sh

Architecture

This project uses Hexagonal Architecture (Ports and Adapters), ensuring modularity, extensibility, and clean separation of concerns.

Kafka -→ KafkaConsumerService -→ EventService -→ ElasticsearchClientService -→ Elasticsearch
                         │                ↓
                         └-→ Metrics + Logging (Prometheus + JSON logs)

Layers

  • Input Ports: Kafka Consumer (aiokafka), deserialization, batching
  • Application Core: Event transformation, validation, retry logic
  • Output Ports: Async Elasticsearch client, bulk insert, failure handling
  • Infrastructure: Docker, Kubernetes, logging, metrics, monitoring

🔍 Why Choose This Over Logstash, Flume, etc.?

  • True async data pipeline — lower latency, better throughput
  • No heavyweight config DSL — Python code, pyproject.toml, .env
  • Built-in retries & fault handling — robust out of the box
  • JSON logging and metric labeling for full observability
  • Open-source & customizable — perfect for modern data teams

Observability

Prometheus scrapes metrics on /metrics (port 8000). Dashboards are automatically provisioned in Grafana.

Metric Description
messages_processed_total Total number of processed messages
errors_total Total errors during processing
consume_duration_seconds Time spent reading from Kafka
response_duration_seconds Time to insert into Elasticsearch
transform_duration_seconds Time spent transforming messages
batch_processing_duration_seconds Full batch processing time

Testing

pytest -v

Includes:

  • ✅ Unit tests
  • ✅ Integration tests (Kafka → ES)
  • ✅ Metrics verification
  • ✅ Config validation

Technologies

  • Python 3.12 + asyncio
  • Kafka + aiokafka
  • Elasticsearch Bulk API
  • Pydantic dotenv poetry
  • Prometheus Grafana
  • Docker docker-compose
  • Kubernetes-ready
  • JSON logging (python-json-logger)

Project Structure

streaming-data-loader/
├── configs/                     # Prometheus / Grafana
├── src/                         # Main source code
│   ├── domain/
│   ├── ports/
│   ├── services/                # Event processing logic
│   ├── config.py                # Settings & env config
│   ├── logger.py                # JSON logger setup
│   └── metrics.py               # Prometheus metrics
├── tests/                       # Unit & integration tests
├── k8s/                         # Kubernetes manifests
├── docker-compose.yml
├── Dockerfile
├── deploy.sh / clean.sh
└── pyproject.toml

If you find this useful...

...give it a star, fork it, or mention it in your next data project!

Author

Anatoly Dudko
GitHub @aDudkoLinkedIn