Skip to content

drag0sd0g/kafka-csv-loader

Repository files navigation

ζ—₯本θͺžγ§θͺ­γ‚€ (Japanese)

πŸš€ Kafka CSV Loader

Kotlin License JaCoCo

A robust, production-ready Kotlin CLI tool for loading CSV data into Apache Kafka with Avro schema validation, Schema Registry integration, and configurable batching.


πŸ“‹ Overview

Kafka CSV Loader bridges the gap between traditional CSV data formats and modern event streaming platforms. It provides a seamless, type-safe way to migrate CSV data into Kafka topics with full schema and data validation.

Use Cases:

  • Data Migration: Moving legacy CSV data into Kafka-based systems
  • Batch Loading: Periodic bulk imports from CSV exports with configurable batching
  • Data Integration: Connecting CSV-based systems to event-driven architectures
  • Testing & Development: Quickly populating Kafka topics with test data
  • Data Validation: Dry-run mode to validate CSV data before production loads

✨ Features

βœ… CSV Parsing - Intelligent CSV parsing with header validation
βœ… Avro Schema Validation - Type-safe data validation against Avro schemas
βœ… Schema Registry Integration - Automatic schema registration and versioning
βœ… Dry Run Mode - Validate CSV and schema without sending to Kafka
βœ… Configurable Batching - Batch records for improved performance
βœ… Async/Sync Modes - Choose between sync (safe) or async (fast) sending
βœ… Error Handling - Detailed validation errors with row-level reporting
βœ… Flexible Key Selection - Choose any CSV column as Kafka message key
βœ… Colorful CLI - Beautiful terminal output with progress indicators
βœ… Production Ready - 80%+ test coverage with unit and integration tests
βœ… Code Quality - Ktlint formatting, JaCoCo coverage reporting


πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   CSV File      β”‚
β”‚  (users.csv)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  CSV Parser     β”‚  ← Validates headers
β”‚  (kotlin-csv)   β”‚    Parses rows
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Avro Schema     β”‚  ← Loads .avsc file
β”‚ Loader          β”‚    Validates structure
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Avro Record     β”‚  ← Maps CSV β†’ Avro
β”‚ Mapper          β”‚    Type conversion
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜    Validation
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Dry Run Mode?   β”‚  ← Optional validation
β”‚                 β”‚    (skip Kafka)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Batching        β”‚  ← Configurable batch size
β”‚ (optional)      β”‚    Sync or Async mode
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Kafka Producer  β”‚  ← Sends to Kafka
β”‚ (Avro Serial.)  β”‚    Schema Registry
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Kafka Topic    β”‚
β”‚ (with Schema)   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ› οΈ Technologies

  • Language: Kotlin 1.9.22 (JVM 21)
  • Build Tool: Gradle 8.14 with Kotlin DSL
  • CLI Framework: Clikt 4.2.1 (command-line parsing)
  • Terminal UI: Mordant 2.2.0 (colored output, progress indicators)
  • CSV Parsing: kotlin-csv-jvm 1.9.2
  • Avro: Apache Avro 1.11.3
  • Kafka: kafka-clients 3.6.1
  • Schema Registry: Confluent Schema Registry 7.5.3
  • Testing: JUnit 5, Kotest, Testcontainers, Mockk
  • Code Quality: Ktlint 1.0.1, JaCoCo 0.8.11
  • Containerization: Docker/Colima support with Testcontainers

πŸ“¦ Installation

Prerequisites

  • Java 21+ (JDK)
  • Docker or Colima (for running Kafka locally or integration tests)
  • Kafka & Schema Registry (running instances for production use)

Build from Source

# Clone the repository
git clone https://github.com/drag0sd0g/kafka-csv-loader.git
cd kafka-csv-loader

# Build the project (includes tests, code coverage, linting)
./gradlew build

# Build fat JAR
./gradlew jar

# The executable JAR will be at:
# build/libs/kafka-csv-loader-*.jar

Run Tests

# Run all tests
./gradlew test

# Run tests with coverage report
./gradlew test jacocoTestReport

# View coverage report
open build/reports/jacoco/test/html/index.html

# Run only unit tests
./gradlew test --tests "*.csv.*" --tests "*.avro.*"

# Run integration tests (requires Docker/Colima)
./gradlew test --tests "*IntegrationTest"

πŸš€ Quick Start

1. Prepare Your Data

Example CSV (users.csv):

id,name,email,age,active
1,Alice,alice@example.com,30,true
2,Bob,bob@example.com,25,false
3,Charlie,charlie@example.com,35,true

Example Avro Schema (user-schema.avsc):

{
    "type": "record",
    "name": "User",
    "namespace": "com.example",
    "fields": [
        { "name": "id", "type": "int" },
        { "name": "name", "type": "string" },
        { "name": "email", "type": "string" },
        { "name": "age", "type": "int" },
        { "name": "active", "type": "boolean" }
    ]
}

2. Start Kafka & Schema Registry

# Using Docker Compose (example)
docker-compose up -d kafka schema-registry

# Or using Confluent Platform
confluent local services start

3. Validate Data (Dry Run)

Before loading to production, validate your CSV:

java -jar build/libs/kafka-csv-loader-*.jar \
  --csv users.csv \
  --schema user-schema.avsc \
  --topic users \
  --dry-run

4. Load Data to Kafka

# Basic loading (row-by-row)
java -jar build/libs/kafka-csv-loader-*.jar \
  --csv users.csv \
  --schema user-schema.avsc \
  --topic users \
  --bootstrap-servers localhost:9092 \
  --schema-registry http://localhost:8081 \
  --key-field id

# With batching for better performance
java -jar build/libs/kafka-csv-loader-*.jar \
  --csv users.csv \
  --schema user-schema.avsc \
  --topic users \
  --batch-size 100

5. Verify Data in Kafka

# Using kafka-avro-console-consumer
kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic users \
  --from-beginning

πŸ“– Usage

Command-Line Options

Usage: kafka-csv-loader [OPTIONS]

  Load CSV data into Kafka with Avro schema validation

Options:
  -c, --csv TEXT              Path to CSV file (required)
  -s, --schema TEXT           Path to Avro schema file (.avsc) (required)
  -t, --topic TEXT            Kafka topic name (required)
  -b, --bootstrap-servers     Kafka bootstrap servers (default: localhost:9092)
  -r, --schema-registry       Schema Registry URL (default: http://localhost:8081)
  -k, --key-field TEXT        CSV column to use as Kafka message key (optional)
  -d, --dry-run               Validate CSV and schema without sending to Kafka
  --batch-size INT            Number of records to batch (default: 1 = no batching)
  --async                     Send batches asynchronously (faster but less safe)
  --version                   Show version and exit
  -h, --help                  Show this message and exit

Examples

Basic Usage (Row-by-Row)

java -jar kafka-csv-loader.jar \
  --csv data.csv \
  --schema schema.avsc \
  --topic my-topic

With Custom Kafka Configuration

java -jar kafka-csv-loader.jar \
  --csv data.csv \
  --schema schema.avsc \
  --topic my-topic \
  --bootstrap-servers kafka1:9092,kafka2:9092 \
  --schema-registry http://schema-registry:8081

Using a Specific Column as Message Key

java -jar kafka-csv-loader.jar \
  --csv orders.csv \
  --schema order-schema.avsc \
  --topic orders \
  --key-field order_id

With Batching (Recommended for Large Files)

# Synchronous batching (safe, recommended)
java -jar kafka-csv-loader.jar \
  --csv large-file.csv \
  --schema schema.avsc \
  --topic my-topic \
  --batch-size 100

# Asynchronous batching (maximum performance)
java -jar kafka-csv-loader.jar \
  --csv large-file.csv \
  --schema schema.avsc \
  --topic my-topic \
  --batch-size 100 \
  --async

Dry Run Mode (Validation Only)

java -jar kafka-csv-loader.jar \
  --csv users.csv \
  --schema user-schema.avsc \
  --topic users \
  --dry-run

πŸ” Dry Run Mode

Validate your CSV and schema without actually sending data to Kafka using the --dry-run flag.

What it does:

  • βœ… Loads and validates the Avro schema
  • βœ… Parses the CSV file
  • βœ… Validates CSV headers match schema fields
  • βœ… Validates all rows can be mapped to Avro records
  • βœ… Reports validation errors with row numbers
  • ❌ Does NOT connect to Kafka
  • ❌ Does NOT send any data

Use cases:

  • Test your CSV data before loading to production
  • Validate schema compatibility
  • Find data quality issues early
  • CI/CD pipeline validation

Example output:

πŸš€ Kafka CSV Loader
   DRY RUN MODE - No data will be sent to Kafka

πŸ“‹ Loading Avro schema... βœ“
   Schema: com.example.User
   Fields: id, name, email, age

πŸ“„ Parsing CSV file... βœ“
   Headers: id, name, email, age
   Rows: 1000

πŸ” Validating CSV headers against schema... βœ“

πŸ” Validating all rows (dry run)...
   βœ“ Validated 50 rows...
   βœ“ Validated 100 rows...
   ...

πŸ“Š Dry Run Summary
   βœ“ Valid rows: 1000
   βœ— Invalid rows: 0

βœ… All rows validated successfully! Ready to load to Kafka.

⚑ Batching & Performance

For large CSV files, batching can significantly improve performance by reducing network roundtrips and improving throughput.

Batch Options

  • --batch-size N - Number of records to batch before sending (default: 1 = no batching)
  • --async - Send batches asynchronously (faster, but requires monitoring)

Performance Comparison

Mode Batch Size 1K rows 10K rows 100K rows Notes
Row-by-row 1 ~3s ~30s ~5min Slowest, most reliable
Sync batch 50 ~1s ~10s ~100s Good balance
Sync batch 100 ~0.8s ~8s ~80s Recommended for production
Async batch 100 ~0.5s ~5s ~50s Fastest, requires monitoring

Batching Examples

Small Files (<1K rows)

java -jar kafka-csv-loader.jar \
  --csv small.csv \
  --schema schema.avsc \
  --topic my-topic

Medium Files (1K-10K rows)

Use sync batching with batch size 50:

java -jar kafka-csv-loader.jar \
  --csv medium.csv \
  --schema schema.avsc \
  --topic my-topic \
  --batch-size 50

Large Files (>10K rows)

Use sync batching with batch size 100:

java -jar kafka-csv-loader.jar \
  --csv large.csv \
  --schema schema.avsc \
  --topic my-topic \
  --batch-size 100

Maximum Performance (async)

Use async batching for maximum throughput:

java -jar kafka-csv-loader.jar \
  --csv huge.csv \
  --schema schema.avsc \
  --topic my-topic \
  --batch-size 100 \
  --async

Recommendations

  • Development/Testing: Use default (no batching) for easier debugging
  • Small files (<1K rows): Use default (no batching)
  • Medium files (1K-10K rows): Use --batch-size 50
  • Large files (>10K rows): Use --batch-size 100
  • Production: Start with sync batching, test thoroughly before using async
  • Async mode: Only use after testing; monitor for errors carefully

Batching Output Example

πŸš€ Kafka CSV Loader

πŸ“‹ Loading Avro schema... βœ“
   Schema: com.example.User
   Fields: id, name, email, age

πŸ“„ Parsing CSV file... βœ“
   Headers: id, name, email, age
   Rows: 10000

πŸ” Validating CSV headers against schema... βœ“

πŸ”Œ Connecting to Kafka...
   Bootstrap servers: localhost:9092
   Schema Registry: http://localhost:8081
   Topic: users

πŸ“€ Sending records to Kafka...
   Batch size: 100, Mode: sync

   βœ“ Processed 50 rows...
   βœ“ Processed 100 rows...
   ...
   βœ“ Processed 10000 rows...


πŸ“Š Summary
   βœ“ Success: 10000
   βœ— Failures: 0

βœ… All records successfully loaded!

🏭 Project Structure

kafka-csv-loader/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ main/kotlin/com/dragos/kafkacsvloader/
β”‚   β”‚   β”œβ”€β”€ cli/
β”‚   β”‚   β”‚   └── LoadCommand.kt          # CLI entry point & command handler
β”‚   β”‚   β”œβ”€β”€ csv/
β”‚   β”‚   β”‚   └── CsvParser.kt            # CSV parsing and validation
β”‚   β”‚   β”œβ”€β”€ avro/
β”‚   β”‚   β”‚   β”œβ”€β”€ AvroSchemaLoader.kt     # Schema loading from .avsc files
β”‚   β”‚   β”‚   └── AvroRecordMapper.kt     # CSV β†’ Avro mapping & type conversion
β”‚   β”‚   └── kafka/
β”‚   β”‚       └── KafkaProducerClient.kt  # Kafka producer with batching support
β”‚   └── test/kotlin/com/dragos/kafkacsvloader/
β”‚       β”œβ”€β”€ cli/
β”‚       β”‚   └── DryRunTest.kt           # Dry-run mode tests
β”‚       β”œβ”€β”€ csv/
β”‚       β”‚   └── CsvParserTest.kt        # CSV parsing tests
β”‚       β”œβ”€β”€ avro/
β”‚       β”‚   β”œβ”€β”€ AvroSchemaLoaderTest.kt # Schema loading tests
β”‚       β”‚   └── AvroRecordMapperTest.kt # Avro mapping tests
β”‚       β”œβ”€β”€ kafka/
β”‚       β”‚   └── KafkaProducerBatchTest.kt # Batching tests
β”‚       └── integration/
β”‚           └── KafkaIntegrationTest.kt # End-to-end Testcontainers tests
β”œβ”€β”€ build.gradle.kts                     # Build configuration with plugins
β”œβ”€β”€ .github/
β”‚   └── workflows/
β”‚       └── release.yml                  # CI/CD and release automation
β”œβ”€β”€ .axion.yml                           # Semantic versioning configuration
└── README.md

πŸ› Error Handling

Provides detailed error messages at every stage:

Schema Validation Errors

❌ Error: Schema validation failed
   Row 5: Field 'age' - Type conversion error: Cannot convert 'invalid' to int
   Row 7: Field 'email' - Missing value for required field

Missing CSV Headers

❌ Error: CSV validation failed
   Missing required fields: age, email

Kafka Connection Errors

❌ Error: Failed to connect to Kafka
   Caused by: Connection refused: localhost:9092

Batch Send Errors

πŸ“Š Summary
   βœ“ Success: 9950
   βœ— Failures: 50

   Invalid rows:
     Row 100: Kafka batch error: Timeout waiting for acknowledgment
     Row 200: Kafka batch error: Timeout waiting for acknowledgment
     ...

Dry Run Validation Errors

πŸ“Š Dry Run Summary
   βœ“ Valid rows: 998
   βœ— Invalid rows: 2

   Invalid rows:
     Row 5: Field 'age' conversion error: For input string: "invalid"
     Row 42: Missing value for required field 'email'

πŸ§ͺ Testing

Test Coverage

  • βœ… Unit Tests: CSV parsing, Avro mapping, validation logic, batching
  • βœ… Integration Tests: End-to-end with Testcontainers (Kafka + Schema Registry)
  • βœ… CLI Tests: Dry-run mode validation
  • πŸ“Š Coverage: 80%+ code coverage (measured by JaCoCo)

Running Tests

# All tests
./gradlew test

# Unit tests only
./gradlew test --tests "*.csv.*" --tests "*.avro.*"

# Integration tests (requires Docker/Colima)
./gradlew test --tests "*IntegrationTest"

# Batching tests
./gradlew test --tests "*BatchTest"

# Generate coverage report
./gradlew jacocoTestReport
open build/reports/jacoco/test/html/index.html

Code Quality

# Run ktlint checks
./gradlew ktlintCheck

# Auto-format code
./gradlew ktlintFormat

# Full quality check (lint + coverage)
./gradlew check

πŸ”§ Configuration for Colima (macOS)

If you're using Colima instead of Docker Desktop for integration tests:

# Start Colima
colima start

# Set environment variables
export DOCKER_HOST="unix://$HOME/.colima/default/docker.sock"
export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE="$HOME/.colima/default/docker.sock"

# Add to ~/.zshrc for persistence
echo 'export DOCKER_HOST="unix://$HOME/.colima/default/docker.sock"' >> ~/.zshrc
echo 'export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE="$HOME/.colima/default/docker.sock"' >> ~/.zshrc

πŸ“¦ Releases

This project uses semantic versioning with automatic releases on every commit to main:

  • Format: v0.0.1, v0.0.2, etc.
  • Automation: GitHub Actions automatically tags and creates releases
  • Artifacts: JAR files are attached to each release

View releases: https://github.com/drag0sd0g/kafka-csv-loader/releases


🀝 Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Run tests and linting (./gradlew build)
  4. Commit your changes (git commit -m 'Add amazing feature')
  5. Push to the branch (git push origin feature/amazing-feature)
  6. Open a Pull Request

Code Standards:

  • Follow Kotlin coding conventions
  • Maintain 80%+ test coverage
  • Pass all ktlint checks
  • Add tests for new features

πŸ“ License

This project is licensed under the MIT License - see the LICENSE file for details.


πŸ™ Acknowledgments


πŸ“§ Contact

Dragos - @drag0sd0g

Project Link: https://github.com/drag0sd0g/kafka-csv-loader


Made with ❀️ and β˜• by Dragos

About

A Kotlin CLI tool for loading CSV files into Kafka with Avro schema validation

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages