A high-throughput, memory-efficient data distribution engine that reads records from flat-file sources and propagates them to multiple downstream systems with built-in resilience, rate limiting, and observability.
- Overview
- Architecture
- Features
- Quick Start
- Configuration
- Design Decisions
- Performance
- Non-Functional Features
- Extending the Engine
- Testing
- Project Structure
- License
Modern data architectures often require propagating data from a single source to multiple specialized systems—search indexes, analytics databases, caches, and third-party APIs. This engine solves that problem by providing:
- Multi-format ingestion: CSV, JSONL, and Fixed-width file support
- Parallel distribution: Simultaneous delivery to REST, gRPC, Message Queue, and Database sinks
- Production-grade resilience: Rate limiting, circuit breakers, retries, and dead letter queues
- Memory efficiency: Process files of any size with constant memory usage
+-----------------------------------------------------------------------------+
| FAN-OUT ENGINE |
+-----------------------------------------------------------------------------+
| |
| +-------------+ +------------------+ +---------------------------+ |
| | SOURCE | | ORCHESTRATOR | | SINKS | |
| | | | | | | |
| | CSV File |--->| Async Pipeline |--->| REST API (JSON) | |
| | JSONL File | | + Backpressure | | gRPC (Protobuf) | |
| | Fixed-Width| | + Rate Limiting | | Message Queue (XML) | |
| | | | | | Wide-Column DB (Avro) | |
| +-------------+ +---------+--------+ +---------------------------+ |
| | |
| v |
| +----------------------+ |
| | RESILIENCE | |
| | - Circuit Breaker | |
| | - Retry Handler | |
| | - Dead Letter Queue | |
| +----------------------+ |
| |
+-----------------------------------------------------------------------------+
For detailed architecture diagrams including data flow, threading model, and class relationships, see docs/ARCHITECTURE.md.
| Format | Description | Use Case |
|---|---|---|
| CSV | Configurable delimiter, quote handling, header detection | Standard data exports |
| JSONL | Line-delimited JSON with nested object support | API exports, logs |
| Fixed-Width | Position-based column extraction | Mainframe/legacy systems |
| Output Format | Target System | Serialization |
|---|---|---|
| JSON | REST APIs | Jackson |
| Protocol Buffers | gRPC Services | google.protobuf.Struct |
| XML | Legacy Message Queues | Jackson XML |
| Avro | Wide-Column Databases | Apache Avro (dynamic schema) |
| Sink | Simulated Behavior |
|---|---|
| REST API | HTTP/2 POST with configurable latency, 4xx/5xx errors |
| gRPC | Unary streaming with UNAVAILABLE/DEADLINE_EXCEEDED errors |
| Message Queue | Kafka-like publishing with leader election scenarios |
| Wide-Column DB | Cassandra-style writes with consistency levels |
| Feature | Implementation | Purpose |
|---|---|---|
| Rate Limiting | Guava Token Bucket | Prevent overwhelming downstream services |
| Circuit Breaker | 3-state (CLOSED/OPEN/HALF_OPEN) | Fail fast on unhealthy services |
| Retry with Backoff | Exponential + jitter | Handle transient failures |
| Dead Letter Queue | File-based JSONL | Preserve failed records for replay |
| Backpressure | BlockingQueue | Prevent memory overflow |
Real-time metrics reported at configurable intervals:
╔══════════════════════════════════════════════════════════════════════════════╗
║ METRICS REPORT ║
╠══════════════════════════════════════════════════════════════════════════════╣
║ Elapsed: 00:00:05 │ Total Processed: 250 │ Throughput: 50.0/sec ║
╠══════════════════════════════════════════════════════════════════════════════╣
║ SINK │ SUCCESS │ FAILURE │ RETRIES │ AVG LATENCY ║
╠──────────────────────────────────────────────────────────────────────────────╣
║ REST API Sink │ 248 │ 2 │ 4 │ 15.32 ms ║
║ gRPC Sink │ 250 │ 0 │ 1 │ 8.45 ms ║
║ Message Queue Sink │ 249 │ 1 │ 2 │ 5.21 ms ║
║ Wide-Column DB Sink │ 250 │ 0 │ 0 │ 3.87 ms ║
╚══════════════════════════════════════════════════════════════════════════════╝
- Java 17 or higher
- Maven 3.9 or higher
mvn clean package -DskipTests# Using default configuration
java -jar target/distributed-data-fanout-engine-1.0.0.jar
# With custom configuration
java -jar target/distributed-data-fanout-engine-1.0.0.jar path/to/config.yaml
# Memory-constrained environment (constant memory usage)
java -Xmx512m -jar target/distributed-data-fanout-engine-1.0.0.jarmvn testConfiguration is managed through YAML files. See src/main/resources/application.yaml for the complete reference.
# Ingestion
ingestion:
source-file: "data/sample_input.csv"
file-format: "CSV" # CSV, JSONL, FIXED_WIDTH
batch-size: 1000
# Sinks with per-sink rate limits
sinks:
rest-api:
enabled: true
rate-limit: 50 # requests per second
max-retries: 3
grpc:
enabled: true
rate-limit: 200
message-queue:
enabled: true
rate-limit: 500
wide-column-db:
enabled: true
rate-limit: 1000
# Orchestration
orchestration:
use-virtual-threads: false # Enable for Java 21+
queue-capacity: 10000 # Backpressure threshold
# Resilience
resilience:
circuit-breaker:
failure-threshold: 5
reset-timeout-seconds: 30
dlq:
path: "data/dlq"
max-size: 100000
# Observability
observability:
status-interval: 5 # secondsProblem: Processing files up to 100GB without running out of memory.
Solution: Line-by-line streaming using BufferedReader with the Iterator pattern. Only one record exists in memory at any time, enabling constant memory usage regardless of file size.
// Pre-fetch pattern for efficient streaming
nextLine = reader.readLine(); // Single line in memoryProblem: High-throughput parallel processing across multiple sinks.
Solution: ForkJoinPool with work-stealing provides efficient concurrency for independent sink operations. CompletableFuture.allOf() coordinates parallel delivery to all sinks.
// Parallel sink dispatch
CompletableFuture.allOf(sinkFutures.toArray(new CompletableFuture[0]))
.thenAccept(this::aggregateResults);Problem: Different downstream systems have different capacity limits.
Solution: Guava's RateLimiter implements the token bucket algorithm, providing smooth rate limiting with burst handling. Each sink has its own configurable rate.
Problem: Retrying failed requests can cause thundering herd problems.
Solution: Exponential backoff (delay doubles each retry) combined with random jitter (±20%) distributes retry attempts over time.
double delay = initialDelayMs * Math.pow(backoffMultiplier, attempt);
double jitter = delay * jitterFactor * (Math.random() * 2 - 1);Problem: Failed records must be preserved without external dependencies.
Solution: JSONL-formatted file storage provides durability, easy inspection, and straightforward replay capabilities.
| Metric | Value | Notes |
|---|---|---|
| Throughput | High (thousands of records/sec) | Depends on sink latency and hardware |
| Memory | Constant ~512MB | Streaming architecture - independent of file size |
| CPU Scaling | Multi-core | ForkJoinPool work-stealing utilizes available cores |
| Startup Time | < 2 seconds | Lazy initialization |
The engine maintains constant memory usage by:
- Streaming input files line-by-line
- Processing records in configurable batches
- Clearing completed futures after each batch
- Using primitive wrappers (
AtomicLong) for counters
This engine implements production-grade non-functional requirements to ensure reliability, performance, and maintainability in real-world deployments.
| Feature | Implementation | Benefit |
|---|---|---|
| Input Validation | Null and empty checks for all transformers | Prevents malformed data from corrupting the pipeline |
| Safe File Handling | Proper resource cleanup and stream closure | Prevents resource leaks and file descriptor exhaustion |
| No Hardcoded Credentials | All sensitive data in configuration files | Supports secure credential management (env vars, vaults) |
| Dependency Security | Up-to-date dependencies with no known CVEs | Minimizes security vulnerabilities |
| Feature | Implementation | Benefit |
|---|---|---|
| Circuit Breaker | 3-state pattern (CLOSED/OPEN/HALF_OPEN) | Prevents cascading failures, fails fast on unhealthy services |
| Retry with Backoff | Exponential backoff + jitter | Handles transient failures without overwhelming services |
| Dead Letter Queue | File-based persistence | Zero data loss - failed records can be replayed |
| Graceful Shutdown | Shutdown hooks, resource cleanup | Ensures in-flight records are processed before exit |
| Feature | Implementation | Benefit |
|---|---|---|
| Streaming Architecture | BufferedReader with Iterator pattern | Constant memory usage regardless of file size |
| Async Processing | CompletableFuture with ForkJoinPool | High throughput through parallel sink dispatch |
| Rate Limiting | Token bucket algorithm (Guava RateLimiter) | Prevents overwhelming downstream services |
| Backpressure | BlockingQueue with configurable capacity | Prevents memory overflow under high load |
Performance Characteristics:
- Throughput: High throughput (thousands of records/sec, depends on sink latency and hardware)
- Memory: Constant ~512MB (streaming architecture handles files of any size)
- CPU Scaling: Multi-core utilization via ForkJoinPool work-stealing
- Startup Time: < 2 seconds
| Feature | Implementation | Benefit |
|---|---|---|
| Horizontal Scalability | Stateless design, file-based coordination | Multiple instances can process different files |
| Vertical Scalability | ForkJoinPool work-stealing | Efficiently utilizes all available CPU cores |
| Configurable Concurrency | Thread pool sizing via configuration | Tune for different hardware profiles |
| Feature | Implementation | Benefit |
|---|---|---|
| Real-time Metrics | Periodic status reports (configurable interval) | Monitor throughput, latency, error rates |
| Per-Sink Statistics | Success/failure counts, retry attempts, avg latency | Identify problematic sinks quickly |
| Structured Logging | SLF4J + Logback with configurable levels | Easy integration with log aggregation tools |
| Metrics Export | Console output with formatted tables | Ready for integration with monitoring systems |
| Feature | Implementation | Benefit |
|---|---|---|
| Design Patterns | Strategy, Factory patterns | Clean separation of concerns, easy to extend |
| Configuration-Driven | YAML-based configuration | No code changes for operational adjustments |
| Comprehensive Testing | 115 unit + integration tests | High confidence in code changes |
| Documentation | README, Architecture docs, inline comments | Easy onboarding for new developers |
Adding a new sink (e.g., Elasticsearch) requires no changes to the orchestrator:
public class ElasticsearchSink extends AbstractSink {
@Override
public CompletableFuture<SinkResult> send(TransformedRecord record) {
// Implementation
}
}case "elasticsearch" -> new ElasticsearchSink();SINK_FORMAT_MAPPING.put("elasticsearch", OutputFormat.JSON);sinks:
elasticsearch:
enabled: true
rate-limit: 200The project includes comprehensive unit and integration tests.
Tests: 115 passing
Coverage: Transformers, File Readers, Resilience, Orchestrator
Framework: JUnit 5 + Mockito + AssertJ
| Category | Tests | Coverage |
|---|---|---|
| Transformers | 18 | JSON, XML, Protobuf, Avro serialization |
| File Readers | 37 | CSV, JSONL, Fixed-Width parsing |
| Resilience | 55 | Circuit breaker, retry, DLQ, rate limiting |
| Integration | 5 | End-to-end orchestrator scenarios |
# All tests
mvn test
# Specific test class
mvn test -Dtest=JsonTransformerTest
# With coverage report
mvn test jacoco:report├── src/
│ ├── main/
│ │ ├── java/com/fanout/
│ │ │ ├── config/ # Configuration management
│ │ │ ├── ingestion/ # File readers (CSV, JSONL, Fixed-Width)
│ │ │ ├── transformation/ # Format transformers
│ │ │ ├── sink/ # Sink implementations
│ │ │ ├── resilience/ # Rate limiting, circuit breaker, DLQ
│ │ │ ├── orchestrator/ # Main processing logic
│ │ │ ├── observability/ # Metrics reporting
│ │ │ ├── model/ # Data models
│ │ │ └── FanOutEngineApplication.java
│ │ └── resources/
│ │ ├── application.yaml # Default configuration
│ │ └── logback.xml # Logging configuration
│ └── test/ # Unit and integration tests
├── data/ # Sample input files
├── docs/ # Architecture documentation
├── pom.xml # Maven build configuration
└── README.md
| Dependency | Version | Purpose |
|---|---|---|
| Jackson | 2.16.1 | JSON/XML/CSV processing |
| Protocol Buffers | 3.25.2 | Protobuf serialization |
| Apache Avro | 1.11.3 | Avro serialization |
| Guava | 33.0.0 | Rate limiting |
| SLF4J + Logback | 2.0.11 | Logging |
| JUnit 5 | 5.10.1 | Testing |
| Mockito | 5.8.0 | Mocking |
- Encoding: All input files are UTF-8 encoded
- Data Types: Numbers and booleans are auto-detected in CSV/Fixed-Width
- Network Simulation: Mock sinks simulate realistic latency (2-30ms) and failure rates (1-2%)
- Schema: Dynamic schema generation for maximum flexibility
This project is licensed under the MIT License. See LICENSE for details.
Kumbham Ajay Goud