Skip to content

High-throughput Java data distribution engine for streaming large files to multiple sinks with built-in resilience and constant memory usage

License

Notifications You must be signed in to change notification settings

AjayKumbham/distributed-data-fanout-engine

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Distributed Data Fan-Out Engine

Java Maven Build Tests License

A high-throughput, memory-efficient data distribution engine that reads records from flat-file sources and propagates them to multiple downstream systems with built-in resilience, rate limiting, and observability.


Table of Contents


Overview

Modern data architectures often require propagating data from a single source to multiple specialized systems—search indexes, analytics databases, caches, and third-party APIs. This engine solves that problem by providing:

  • Multi-format ingestion: CSV, JSONL, and Fixed-width file support
  • Parallel distribution: Simultaneous delivery to REST, gRPC, Message Queue, and Database sinks
  • Production-grade resilience: Rate limiting, circuit breakers, retries, and dead letter queues
  • Memory efficiency: Process files of any size with constant memory usage

Architecture

+-----------------------------------------------------------------------------+
|                              FAN-OUT ENGINE                                 |
+-----------------------------------------------------------------------------+
|                                                                             |
|  +-------------+    +------------------+    +---------------------------+   |
|  |   SOURCE    |    |   ORCHESTRATOR   |    |          SINKS            |   |
|  |             |    |                  |    |                           |   |
|  |  CSV File   |--->|  Async Pipeline  |--->|  REST API    (JSON)       |   |
|  |  JSONL File |    |  + Backpressure  |    |  gRPC        (Protobuf)   |   |
|  |  Fixed-Width|    |  + Rate Limiting |    |  Message Queue (XML)      |   |
|  |             |    |                  |    |  Wide-Column DB (Avro)    |   |
|  +-------------+    +---------+--------+    +---------------------------+   |
|                               |                                             |
|                               v                                             |
|                    +----------------------+                                 |
|                    |   RESILIENCE         |                                 |
|                    |  - Circuit Breaker   |                                 |
|                    |  - Retry Handler     |                                 |
|                    |  - Dead Letter Queue |                                 |
|                    +----------------------+                                 |
|                                                                             |
+-----------------------------------------------------------------------------+

For detailed architecture diagrams including data flow, threading model, and class relationships, see docs/ARCHITECTURE.md.


Features

Ingestion Layer

Format Description Use Case
CSV Configurable delimiter, quote handling, header detection Standard data exports
JSONL Line-delimited JSON with nested object support API exports, logs
Fixed-Width Position-based column extraction Mainframe/legacy systems

Transformation Layer

Output Format Target System Serialization
JSON REST APIs Jackson
Protocol Buffers gRPC Services google.protobuf.Struct
XML Legacy Message Queues Jackson XML
Avro Wide-Column Databases Apache Avro (dynamic schema)

Distribution Layer

Sink Simulated Behavior
REST API HTTP/2 POST with configurable latency, 4xx/5xx errors
gRPC Unary streaming with UNAVAILABLE/DEADLINE_EXCEEDED errors
Message Queue Kafka-like publishing with leader election scenarios
Wide-Column DB Cassandra-style writes with consistency levels

Resilience

Feature Implementation Purpose
Rate Limiting Guava Token Bucket Prevent overwhelming downstream services
Circuit Breaker 3-state (CLOSED/OPEN/HALF_OPEN) Fail fast on unhealthy services
Retry with Backoff Exponential + jitter Handle transient failures
Dead Letter Queue File-based JSONL Preserve failed records for replay
Backpressure BlockingQueue Prevent memory overflow

Observability

Real-time metrics reported at configurable intervals:

╔══════════════════════════════════════════════════════════════════════════════╗
║                           METRICS REPORT                                     ║
╠══════════════════════════════════════════════════════════════════════════════╣
║  Elapsed: 00:00:05  │  Total Processed: 250        │  Throughput: 50.0/sec   ║
╠══════════════════════════════════════════════════════════════════════════════╣
║  SINK                  │  SUCCESS  │  FAILURE  │  RETRIES  │  AVG LATENCY    ║
╠──────────────────────────────────────────────────────────────────────────────╣
║  REST API Sink         │      248  │        2  │        4  │     15.32 ms    ║
║  gRPC Sink             │      250  │        0  │        1  │      8.45 ms    ║
║  Message Queue Sink    │      249  │        1  │        2  │      5.21 ms    ║
║  Wide-Column DB Sink   │      250  │        0  │        0  │      3.87 ms    ║
╚══════════════════════════════════════════════════════════════════════════════╝

Quick Start

Prerequisites

  • Java 17 or higher
  • Maven 3.9 or higher

Build

mvn clean package -DskipTests

Run

# Using default configuration
java -jar target/distributed-data-fanout-engine-1.0.0.jar

# With custom configuration
java -jar target/distributed-data-fanout-engine-1.0.0.jar path/to/config.yaml

# Memory-constrained environment (constant memory usage)
java -Xmx512m -jar target/distributed-data-fanout-engine-1.0.0.jar

Run Tests

mvn test

Configuration

Configuration is managed through YAML files. See src/main/resources/application.yaml for the complete reference.

Key Configuration Sections

# Ingestion
ingestion:
  source-file: "data/sample_input.csv"
  file-format: "CSV"  # CSV, JSONL, FIXED_WIDTH
  batch-size: 1000

# Sinks with per-sink rate limits
sinks:
  rest-api:
    enabled: true
    rate-limit: 50      # requests per second
    max-retries: 3
  grpc:
    enabled: true
    rate-limit: 200
  message-queue:
    enabled: true
    rate-limit: 500
  wide-column-db:
    enabled: true
    rate-limit: 1000

# Orchestration
orchestration:
  use-virtual-threads: false  # Enable for Java 21+
  queue-capacity: 10000       # Backpressure threshold

# Resilience
resilience:
  circuit-breaker:
    failure-threshold: 5
    reset-timeout-seconds: 30
  dlq:
    path: "data/dlq"
    max-size: 100000

# Observability
observability:
  status-interval: 5  # seconds

Design Decisions

1. Streaming with BufferedReader

Problem: Processing files up to 100GB without running out of memory.

Solution: Line-by-line streaming using BufferedReader with the Iterator pattern. Only one record exists in memory at any time, enabling constant memory usage regardless of file size.

// Pre-fetch pattern for efficient streaming
nextLine = reader.readLine();  // Single line in memory

2. ForkJoinPool for Concurrency

Problem: High-throughput parallel processing across multiple sinks.

Solution: ForkJoinPool with work-stealing provides efficient concurrency for independent sink operations. CompletableFuture.allOf() coordinates parallel delivery to all sinks.

// Parallel sink dispatch
CompletableFuture.allOf(sinkFutures.toArray(new CompletableFuture[0]))
        .thenAccept(this::aggregateResults);

3. Token Bucket Rate Limiting

Problem: Different downstream systems have different capacity limits.

Solution: Guava's RateLimiter implements the token bucket algorithm, providing smooth rate limiting with burst handling. Each sink has its own configurable rate.

4. Exponential Backoff with Jitter

Problem: Retrying failed requests can cause thundering herd problems.

Solution: Exponential backoff (delay doubles each retry) combined with random jitter (±20%) distributes retry attempts over time.

double delay = initialDelayMs * Math.pow(backoffMultiplier, attempt);
double jitter = delay * jitterFactor * (Math.random() * 2 - 1);

5. File-Based Dead Letter Queue

Problem: Failed records must be preserved without external dependencies.

Solution: JSONL-formatted file storage provides durability, easy inspection, and straightforward replay capabilities.


Performance

Metric Value Notes
Throughput High (thousands of records/sec) Depends on sink latency and hardware
Memory Constant ~512MB Streaming architecture - independent of file size
CPU Scaling Multi-core ForkJoinPool work-stealing utilizes available cores
Startup Time < 2 seconds Lazy initialization

Memory Characteristics

The engine maintains constant memory usage by:

  1. Streaming input files line-by-line
  2. Processing records in configurable batches
  3. Clearing completed futures after each batch
  4. Using primitive wrappers (AtomicLong) for counters

Non-Functional Features

This engine implements production-grade non-functional requirements to ensure reliability, performance, and maintainability in real-world deployments.

Security

Feature Implementation Benefit
Input Validation Null and empty checks for all transformers Prevents malformed data from corrupting the pipeline
Safe File Handling Proper resource cleanup and stream closure Prevents resource leaks and file descriptor exhaustion
No Hardcoded Credentials All sensitive data in configuration files Supports secure credential management (env vars, vaults)
Dependency Security Up-to-date dependencies with no known CVEs Minimizes security vulnerabilities

Reliability

Feature Implementation Benefit
Circuit Breaker 3-state pattern (CLOSED/OPEN/HALF_OPEN) Prevents cascading failures, fails fast on unhealthy services
Retry with Backoff Exponential backoff + jitter Handles transient failures without overwhelming services
Dead Letter Queue File-based persistence Zero data loss - failed records can be replayed
Graceful Shutdown Shutdown hooks, resource cleanup Ensures in-flight records are processed before exit

Performance

Feature Implementation Benefit
Streaming Architecture BufferedReader with Iterator pattern Constant memory usage regardless of file size
Async Processing CompletableFuture with ForkJoinPool High throughput through parallel sink dispatch
Rate Limiting Token bucket algorithm (Guava RateLimiter) Prevents overwhelming downstream services
Backpressure BlockingQueue with configurable capacity Prevents memory overflow under high load

Performance Characteristics:

  • Throughput: High throughput (thousands of records/sec, depends on sink latency and hardware)
  • Memory: Constant ~512MB (streaming architecture handles files of any size)
  • CPU Scaling: Multi-core utilization via ForkJoinPool work-stealing
  • Startup Time: < 2 seconds

Scalability

Feature Implementation Benefit
Horizontal Scalability Stateless design, file-based coordination Multiple instances can process different files
Vertical Scalability ForkJoinPool work-stealing Efficiently utilizes all available CPU cores
Configurable Concurrency Thread pool sizing via configuration Tune for different hardware profiles

Observability

Feature Implementation Benefit
Real-time Metrics Periodic status reports (configurable interval) Monitor throughput, latency, error rates
Per-Sink Statistics Success/failure counts, retry attempts, avg latency Identify problematic sinks quickly
Structured Logging SLF4J + Logback with configurable levels Easy integration with log aggregation tools
Metrics Export Console output with formatted tables Ready for integration with monitoring systems

Maintainability

Feature Implementation Benefit
Design Patterns Strategy, Factory patterns Clean separation of concerns, easy to extend
Configuration-Driven YAML-based configuration No code changes for operational adjustments
Comprehensive Testing 115 unit + integration tests High confidence in code changes
Documentation README, Architecture docs, inline comments Easy onboarding for new developers

Extending the Engine

Adding a new sink (e.g., Elasticsearch) requires no changes to the orchestrator:

1. Create the Sink Class

public class ElasticsearchSink extends AbstractSink {
    @Override
    public CompletableFuture<SinkResult> send(TransformedRecord record) {
        // Implementation
    }
}

2. Register in SinkFactory

case "elasticsearch" -> new ElasticsearchSink();

3. Add Format Mapping

SINK_FORMAT_MAPPING.put("elasticsearch", OutputFormat.JSON);

4. Add Configuration

sinks:
  elasticsearch:
    enabled: true
    rate-limit: 200

Testing

The project includes comprehensive unit and integration tests.

Test Summary

Tests:     115 passing
Coverage:  Transformers, File Readers, Resilience, Orchestrator
Framework: JUnit 5 + Mockito + AssertJ

Test Categories

Category Tests Coverage
Transformers 18 JSON, XML, Protobuf, Avro serialization
File Readers 37 CSV, JSONL, Fixed-Width parsing
Resilience 55 Circuit breaker, retry, DLQ, rate limiting
Integration 5 End-to-end orchestrator scenarios

Running Specific Tests

# All tests
mvn test

# Specific test class
mvn test -Dtest=JsonTransformerTest

# With coverage report
mvn test jacoco:report

Project Structure

├── src/
│   ├── main/
│   │   ├── java/com/fanout/
│   │   │   ├── config/           # Configuration management
│   │   │   ├── ingestion/        # File readers (CSV, JSONL, Fixed-Width)
│   │   │   ├── transformation/   # Format transformers
│   │   │   ├── sink/             # Sink implementations
│   │   │   ├── resilience/       # Rate limiting, circuit breaker, DLQ
│   │   │   ├── orchestrator/     # Main processing logic
│   │   │   ├── observability/    # Metrics reporting
│   │   │   ├── model/            # Data models
│   │   │   └── FanOutEngineApplication.java
│   │   └── resources/
│   │       ├── application.yaml  # Default configuration
│   │       └── logback.xml       # Logging configuration
│   └── test/                     # Unit and integration tests
├── data/                         # Sample input files
├── docs/                         # Architecture documentation
├── pom.xml                       # Maven build configuration
└── README.md

Dependencies

Dependency Version Purpose
Jackson 2.16.1 JSON/XML/CSV processing
Protocol Buffers 3.25.2 Protobuf serialization
Apache Avro 1.11.3 Avro serialization
Guava 33.0.0 Rate limiting
SLF4J + Logback 2.0.11 Logging
JUnit 5 5.10.1 Testing
Mockito 5.8.0 Mocking

Assumptions

  1. Encoding: All input files are UTF-8 encoded
  2. Data Types: Numbers and booleans are auto-detected in CSV/Fixed-Width
  3. Network Simulation: Mock sinks simulate realistic latency (2-30ms) and failure rates (1-2%)
  4. Schema: Dynamic schema generation for maximum flexibility

License

This project is licensed under the MIT License. See LICENSE for details.


Author

Kumbham Ajay Goud


References

About

High-throughput Java data distribution engine for streaming large files to multiple sinks with built-in resilience and constant memory usage

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages