A robust, production-ready Kotlin CLI tool for loading CSV data into Apache Kafka with Avro schema validation, Schema Registry integration, and configurable batching.
Kafka CSV Loader bridges the gap between traditional CSV data formats and modern event streaming platforms. It provides a seamless, type-safe way to migrate CSV data into Kafka topics with full schema and data validation.
Use Cases:
- Data Migration: Moving legacy CSV data into Kafka-based systems
- Batch Loading: Periodic bulk imports from CSV exports with configurable batching
- Data Integration: Connecting CSV-based systems to event-driven architectures
- Testing & Development: Quickly populating Kafka topics with test data
- Data Validation: Dry-run mode to validate CSV data before production loads
β
CSV Parsing - Intelligent CSV parsing with header validation
β
Avro Schema Validation - Type-safe data validation against Avro schemas
β
Schema Registry Integration - Automatic schema registration and versioning
β
Dry Run Mode - Validate CSV and schema without sending to Kafka
β
Configurable Batching - Batch records for improved performance
β
Async/Sync Modes - Choose between sync (safe) or async (fast) sending
β
Error Handling - Detailed validation errors with row-level reporting
β
Flexible Key Selection - Choose any CSV column as Kafka message key
β
Colorful CLI - Beautiful terminal output with progress indicators
β
Production Ready - 80%+ test coverage with unit and integration tests
β
Code Quality - Ktlint formatting, JaCoCo coverage reporting
βββββββββββββββββββ
β CSV File β
β (users.csv) β
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β CSV Parser β β Validates headers
β (kotlin-csv) β Parses rows
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Avro Schema β β Loads .avsc file
β Loader β Validates structure
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Avro Record β β Maps CSV β Avro
β Mapper β Type conversion
ββββββββββ¬βββββββββ Validation
β
βΌ
βββββββββββββββββββ
β Dry Run Mode? β β Optional validation
β β (skip Kafka)
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Batching β β Configurable batch size
β (optional) β Sync or Async mode
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Kafka Producer β β Sends to Kafka
β (Avro Serial.) β Schema Registry
ββββββββββ¬βββββββββ
β
βΌ
βββββββββββββββββββ
β Kafka Topic β
β (with Schema) β
βββββββββββββββββββ
- Language: Kotlin 1.9.22 (JVM 21)
- Build Tool: Gradle 8.14 with Kotlin DSL
- CLI Framework: Clikt 4.2.1 (command-line parsing)
- Terminal UI: Mordant 2.2.0 (colored output, progress indicators)
- CSV Parsing: kotlin-csv-jvm 1.9.2
- Avro: Apache Avro 1.11.3
- Kafka: kafka-clients 3.6.1
- Schema Registry: Confluent Schema Registry 7.5.3
- Testing: JUnit 5, Kotest, Testcontainers, Mockk
- Code Quality: Ktlint 1.0.1, JaCoCo 0.8.11
- Containerization: Docker/Colima support with Testcontainers
- Java 21+ (JDK)
- Docker or Colima (for running Kafka locally or integration tests)
- Kafka & Schema Registry (running instances for production use)
# Clone the repository
git clone https://github.com/drag0sd0g/kafka-csv-loader.git
cd kafka-csv-loader
# Build the project (includes tests, code coverage, linting)
./gradlew build
# Build fat JAR
./gradlew jar
# The executable JAR will be at:
# build/libs/kafka-csv-loader-*.jar# Run all tests
./gradlew test
# Run tests with coverage report
./gradlew test jacocoTestReport
# View coverage report
open build/reports/jacoco/test/html/index.html
# Run only unit tests
./gradlew test --tests "*.csv.*" --tests "*.avro.*"
# Run integration tests (requires Docker/Colima)
./gradlew test --tests "*IntegrationTest"Example CSV (users.csv):
id,name,email,age,active
1,Alice,alice@example.com,30,true
2,Bob,bob@example.com,25,false
3,Charlie,charlie@example.com,35,true
Example Avro Schema (user-schema.avsc):
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "age", "type": "int" },
{ "name": "active", "type": "boolean" }
]
}# Using Docker Compose (example)
docker-compose up -d kafka schema-registry
# Or using Confluent Platform
confluent local services startBefore loading to production, validate your CSV:
java -jar build/libs/kafka-csv-loader-*.jar \
--csv users.csv \
--schema user-schema.avsc \
--topic users \
--dry-run# Basic loading (row-by-row)
java -jar build/libs/kafka-csv-loader-*.jar \
--csv users.csv \
--schema user-schema.avsc \
--topic users \
--bootstrap-servers localhost:9092 \
--schema-registry http://localhost:8081 \
--key-field id
# With batching for better performance
java -jar build/libs/kafka-csv-loader-*.jar \
--csv users.csv \
--schema user-schema.avsc \
--topic users \
--batch-size 100# Using kafka-avro-console-consumer
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic users \
--from-beginningUsage: kafka-csv-loader [OPTIONS]
Load CSV data into Kafka with Avro schema validation
Options:
-c, --csv TEXT Path to CSV file (required)
-s, --schema TEXT Path to Avro schema file (.avsc) (required)
-t, --topic TEXT Kafka topic name (required)
-b, --bootstrap-servers Kafka bootstrap servers (default: localhost:9092)
-r, --schema-registry Schema Registry URL (default: http://localhost:8081)
-k, --key-field TEXT CSV column to use as Kafka message key (optional)
-d, --dry-run Validate CSV and schema without sending to Kafka
--batch-size INT Number of records to batch (default: 1 = no batching)
--async Send batches asynchronously (faster but less safe)
--version Show version and exit
-h, --help Show this message and exit
java -jar kafka-csv-loader.jar \
--csv data.csv \
--schema schema.avsc \
--topic my-topicjava -jar kafka-csv-loader.jar \
--csv data.csv \
--schema schema.avsc \
--topic my-topic \
--bootstrap-servers kafka1:9092,kafka2:9092 \
--schema-registry http://schema-registry:8081java -jar kafka-csv-loader.jar \
--csv orders.csv \
--schema order-schema.avsc \
--topic orders \
--key-field order_id# Synchronous batching (safe, recommended)
java -jar kafka-csv-loader.jar \
--csv large-file.csv \
--schema schema.avsc \
--topic my-topic \
--batch-size 100
# Asynchronous batching (maximum performance)
java -jar kafka-csv-loader.jar \
--csv large-file.csv \
--schema schema.avsc \
--topic my-topic \
--batch-size 100 \
--asyncjava -jar kafka-csv-loader.jar \
--csv users.csv \
--schema user-schema.avsc \
--topic users \
--dry-runValidate your CSV and schema without actually sending data to Kafka using the --dry-run flag.
What it does:
- β Loads and validates the Avro schema
- β Parses the CSV file
- β Validates CSV headers match schema fields
- β Validates all rows can be mapped to Avro records
- β Reports validation errors with row numbers
- β Does NOT connect to Kafka
- β Does NOT send any data
Use cases:
- Test your CSV data before loading to production
- Validate schema compatibility
- Find data quality issues early
- CI/CD pipeline validation
Example output:
π Kafka CSV Loader
DRY RUN MODE - No data will be sent to Kafka
π Loading Avro schema... β
Schema: com.example.User
Fields: id, name, email, age
π Parsing CSV file... β
Headers: id, name, email, age
Rows: 1000
π Validating CSV headers against schema... β
π Validating all rows (dry run)...
β Validated 50 rows...
β Validated 100 rows...
...
π Dry Run Summary
β Valid rows: 1000
β Invalid rows: 0
β
All rows validated successfully! Ready to load to Kafka.
For large CSV files, batching can significantly improve performance by reducing network roundtrips and improving throughput.
--batch-size N- Number of records to batch before sending (default: 1 = no batching)--async- Send batches asynchronously (faster, but requires monitoring)
| Mode | Batch Size | 1K rows | 10K rows | 100K rows | Notes |
|---|---|---|---|---|---|
| Row-by-row | 1 | ~3s | ~30s | ~5min | Slowest, most reliable |
| Sync batch | 50 | ~1s | ~10s | ~100s | Good balance |
| Sync batch | 100 | ~0.8s | ~8s | ~80s | Recommended for production |
| Async batch | 100 | ~0.5s | ~5s | ~50s | Fastest, requires monitoring |
java -jar kafka-csv-loader.jar \
--csv small.csv \
--schema schema.avsc \
--topic my-topicUse sync batching with batch size 50:
java -jar kafka-csv-loader.jar \
--csv medium.csv \
--schema schema.avsc \
--topic my-topic \
--batch-size 50Use sync batching with batch size 100:
java -jar kafka-csv-loader.jar \
--csv large.csv \
--schema schema.avsc \
--topic my-topic \
--batch-size 100Use async batching for maximum throughput:
java -jar kafka-csv-loader.jar \
--csv huge.csv \
--schema schema.avsc \
--topic my-topic \
--batch-size 100 \
--async- Development/Testing: Use default (no batching) for easier debugging
- Small files (<1K rows): Use default (no batching)
- Medium files (1K-10K rows): Use
--batch-size 50 - Large files (>10K rows): Use
--batch-size 100 - Production: Start with sync batching, test thoroughly before using async
- Async mode: Only use after testing; monitor for errors carefully
Batching Output Example
π Kafka CSV Loader
π Loading Avro schema... β
Schema: com.example.User
Fields: id, name, email, age
π Parsing CSV file... β
Headers: id, name, email, age
Rows: 10000
π Validating CSV headers against schema... β
π Connecting to Kafka...
Bootstrap servers: localhost:9092
Schema Registry: http://localhost:8081
Topic: users
π€ Sending records to Kafka...
Batch size: 100, Mode: sync
β Processed 50 rows...
β Processed 100 rows...
...
β Processed 10000 rows...
π Summary
β Success: 10000
β Failures: 0
β
All records successfully loaded!
kafka-csv-loader/
βββ src/
β βββ main/kotlin/com/dragos/kafkacsvloader/
β β βββ cli/
β β β βββ LoadCommand.kt # CLI entry point & command handler
β β βββ csv/
β β β βββ CsvParser.kt # CSV parsing and validation
β β βββ avro/
β β β βββ AvroSchemaLoader.kt # Schema loading from .avsc files
β β β βββ AvroRecordMapper.kt # CSV β Avro mapping & type conversion
β β βββ kafka/
β β βββ KafkaProducerClient.kt # Kafka producer with batching support
β βββ test/kotlin/com/dragos/kafkacsvloader/
β βββ cli/
β β βββ DryRunTest.kt # Dry-run mode tests
β βββ csv/
β β βββ CsvParserTest.kt # CSV parsing tests
β βββ avro/
β β βββ AvroSchemaLoaderTest.kt # Schema loading tests
β β βββ AvroRecordMapperTest.kt # Avro mapping tests
β βββ kafka/
β β βββ KafkaProducerBatchTest.kt # Batching tests
β βββ integration/
β βββ KafkaIntegrationTest.kt # End-to-end Testcontainers tests
βββ build.gradle.kts # Build configuration with plugins
βββ .github/
β βββ workflows/
β βββ release.yml # CI/CD and release automation
βββ .axion.yml # Semantic versioning configuration
βββ README.md
Provides detailed error messages at every stage:
Schema Validation Errors
β Error: Schema validation failed
Row 5: Field 'age' - Type conversion error: Cannot convert 'invalid' to int
Row 7: Field 'email' - Missing value for required field
Missing CSV Headers
β Error: CSV validation failed
Missing required fields: age, email
Kafka Connection Errors
β Error: Failed to connect to Kafka
Caused by: Connection refused: localhost:9092
Batch Send Errors
π Summary
β Success: 9950
β Failures: 50
Invalid rows:
Row 100: Kafka batch error: Timeout waiting for acknowledgment
Row 200: Kafka batch error: Timeout waiting for acknowledgment
...
Dry Run Validation Errors
π Dry Run Summary
β Valid rows: 998
β Invalid rows: 2
Invalid rows:
Row 5: Field 'age' conversion error: For input string: "invalid"
Row 42: Missing value for required field 'email'
- β Unit Tests: CSV parsing, Avro mapping, validation logic, batching
- β Integration Tests: End-to-end with Testcontainers (Kafka + Schema Registry)
- β CLI Tests: Dry-run mode validation
- π Coverage: 80%+ code coverage (measured by JaCoCo)
# All tests
./gradlew test
# Unit tests only
./gradlew test --tests "*.csv.*" --tests "*.avro.*"
# Integration tests (requires Docker/Colima)
./gradlew test --tests "*IntegrationTest"
# Batching tests
./gradlew test --tests "*BatchTest"
# Generate coverage report
./gradlew jacocoTestReport
open build/reports/jacoco/test/html/index.html# Run ktlint checks
./gradlew ktlintCheck
# Auto-format code
./gradlew ktlintFormat
# Full quality check (lint + coverage)
./gradlew checkIf you're using Colima instead of Docker Desktop for integration tests:
# Start Colima
colima start
# Set environment variables
export DOCKER_HOST="unix://$HOME/.colima/default/docker.sock"
export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE="$HOME/.colima/default/docker.sock"
# Add to ~/.zshrc for persistence
echo 'export DOCKER_HOST="unix://$HOME/.colima/default/docker.sock"' >> ~/.zshrc
echo 'export TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE="$HOME/.colima/default/docker.sock"' >> ~/.zshrcThis project uses semantic versioning with automatic releases on every commit to main:
- Format:
v0.0.1,v0.0.2, etc. - Automation: GitHub Actions automatically tags and creates releases
- Artifacts: JAR files are attached to each release
View releases: https://github.com/drag0sd0g/kafka-csv-loader/releases
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Run tests and linting (
./gradlew build) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Code Standards:
- Follow Kotlin coding conventions
- Maintain 80%+ test coverage
- Pass all ktlint checks
- Add tests for new features
This project is licensed under the MIT License - see the LICENSE file for details.
- Built with Clikt for elegant CLI parsing
- Terminal UI powered by Mordant
- CSV parsing by kotlin-csv
- Integration testing with Testcontainers
- Code quality with Ktlint
- Coverage reporting with JaCoCo
Dragos - @drag0sd0g
Project Link: https://github.com/drag0sd0g/kafka-csv-loader
Made with β€οΈ and β by Dragos