Skip to content

Commit 87d8b90

Browse files
committed
dry run testing
1 parent 2fd2278 commit 87d8b90

File tree

2 files changed

+282
-35
lines changed

2 files changed

+282
-35
lines changed

src/main/kotlin/com/dragos/kafkacsvloader/cli/LoadCommand.kt

Lines changed: 90 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.dragos.kafkacsvloader.csv.CsvParser
77
import com.dragos.kafkacsvloader.kafka.KafkaProducerClient
88
import com.github.ajalt.clikt.core.CliktCommand
99
import com.github.ajalt.clikt.parameters.options.default
10+
import com.github.ajalt.clikt.parameters.options.flag
1011
import com.github.ajalt.clikt.parameters.options.option
1112
import com.github.ajalt.clikt.parameters.options.required
1213
import com.github.ajalt.clikt.parameters.options.versionOption
@@ -40,6 +41,11 @@ class KafkaCsvLoaderCommand : CliktCommand(
4041
"-k",
4142
help = "CSV column to use as Kafka message key (optional)",
4243
)
44+
private val dryRun by option(
45+
"--dry-run",
46+
"-d",
47+
help = "Validate CSV and schema without sending to Kafka",
48+
).flag(default = false)
4349

4450
init {
4551
versionOption(getVersion())
@@ -49,6 +55,9 @@ class KafkaCsvLoaderCommand : CliktCommand(
4955

5056
override fun run() {
5157
terminal.println(bold(cyan("🚀 Kafka CSV Loader")))
58+
if (dryRun) {
59+
terminal.println(yellow(" DRY RUN MODE - No data will be sent to Kafka"))
60+
}
5261
terminal.println()
5362

5463
try {
@@ -80,21 +89,13 @@ class KafkaCsvLoaderCommand : CliktCommand(
8089
terminal.println(green(""))
8190
terminal.println()
8291

83-
// Step 4: Connect to Kafka
84-
terminal.print(yellow("🔌 Connecting to Kafka... "))
85-
terminal.println()
86-
terminal.println(" Bootstrap servers: $bootstrapServers")
87-
terminal.println(" Schema Registry: $schemaRegistry")
88-
terminal.println(" Topic: $topic")
89-
terminal.println()
90-
91-
// Step 5: Process and send records
92-
KafkaProducerClient(bootstrapServers, schemaRegistry).use { producer ->
93-
terminal.println(yellow("📤 Sending records to Kafka..."))
92+
if (dryRun) {
93+
// Dry run: validate all rows can be mapped
94+
terminal.println(yellow("🔍 Validating all rows (dry run)..."))
9495
terminal.println()
9596

96-
var successCount = 0
97-
var failureCount = 0
97+
var validCount = 0
98+
var invalidCount = 0
9899
val failures = mutableListOf<Pair<Int, String>>()
99100

100101
csvData.rows.forEachIndexed { index, row ->
@@ -103,37 +104,26 @@ class KafkaCsvLoaderCommand : CliktCommand(
103104

104105
when (result) {
105106
is RowMappingResult.Success -> {
106-
val key = keyField?.let { row[it] }
107-
try {
108-
producer.sendSync(topic, key, result.record)
109-
successCount++
110-
terminal.print(green(""))
111-
if (rowNumber % 50 == 0) {
112-
terminal.println(" $rowNumber")
113-
}
114-
} catch (e: Exception) {
115-
failureCount++
116-
failures.add(rowNumber to "Kafka error: ${e.message}")
117-
terminal.print(red(""))
107+
validCount++
108+
if (rowNumber % 50 == 0) {
109+
terminal.println(green(" ✓ Validated $rowNumber rows..."))
118110
}
119111
}
120112
is RowMappingResult.Failure -> {
121-
failureCount++
113+
invalidCount++
122114
failures.add(rowNumber to result.errors.joinToString("; "))
123-
terminal.print(red(""))
124115
}
125116
}
126117
}
118+
127119
terminal.println()
128-
terminal.println()
120+
terminal.println(bold(cyan("📊 Dry Run Summary")))
121+
terminal.println(green(" ✓ Valid rows: $validCount"))
122+
terminal.println(red(" ✗ Invalid rows: $invalidCount"))
129123

130-
// Step 6: Summary
131-
terminal.println(bold(cyan("📊 Summary")))
132-
terminal.println(green(" ✓ Success: $successCount"))
133-
if (failureCount > 0) {
134-
terminal.println(red(" ✗ Failures: $failureCount"))
124+
if (failures.isNotEmpty()) {
135125
terminal.println()
136-
terminal.println(yellow(" Failed rows:"))
126+
terminal.println(yellow(" Invalid rows:"))
137127
failures.take(10).forEach { (rowNum, error) ->
138128
terminal.println(red(" Row $rowNum: $error"))
139129
}
@@ -142,8 +132,73 @@ class KafkaCsvLoaderCommand : CliktCommand(
142132
}
143133
exitProcess(1)
144134
}
135+
136+
terminal.println()
137+
terminal.println(bold(green("✅ All rows validated successfully! Ready to load to Kafka.")))
138+
} else {
139+
// Normal run: connect to Kafka and send
140+
terminal.print(yellow("🔌 Connecting to Kafka... "))
141+
terminal.println()
142+
terminal.println(" Bootstrap servers: $bootstrapServers")
143+
terminal.println(" Schema Registry: $schemaRegistry")
144+
terminal.println(" Topic: $topic")
145145
terminal.println()
146-
terminal.println(bold(green("✅ All records successfully loaded!")))
146+
147+
KafkaProducerClient(bootstrapServers, schemaRegistry).use { producer ->
148+
terminal.println(yellow("📤 Sending records to Kafka..."))
149+
terminal.println()
150+
151+
var successCount = 0
152+
var failureCount = 0
153+
val failures = mutableListOf<Pair<Int, String>>()
154+
155+
csvData.rows.forEachIndexed { index, row ->
156+
val rowNumber = index + 1
157+
val result = AvroRecordMapper.mapRow(schema, row)
158+
159+
when (result) {
160+
is RowMappingResult.Success -> {
161+
val key = keyField?.let { row[it] }
162+
try {
163+
producer.sendSync(topic, key, result.record)
164+
successCount++
165+
terminal.print(green(""))
166+
if (rowNumber % 50 == 0) {
167+
terminal.println(" $rowNumber")
168+
}
169+
} catch (e: Exception) {
170+
failureCount++
171+
failures.add(rowNumber to "Kafka error: ${e.message}")
172+
terminal.print(red(""))
173+
}
174+
}
175+
is RowMappingResult.Failure -> {
176+
failureCount++
177+
failures.add(rowNumber to result.errors.joinToString("; "))
178+
terminal.print(red(""))
179+
}
180+
}
181+
}
182+
terminal.println()
183+
terminal.println()
184+
185+
terminal.println(bold(cyan("📊 Summary")))
186+
terminal.println(green(" ✓ Success: $successCount"))
187+
if (failureCount > 0) {
188+
terminal.println(red(" ✗ Failures: $failureCount"))
189+
terminal.println()
190+
terminal.println(yellow(" Failed rows:"))
191+
failures.take(10).forEach { (rowNum, error) ->
192+
terminal.println(red(" Row $rowNum: $error"))
193+
}
194+
if (failures.size > 10) {
195+
terminal.println(yellow(" ... and ${failures.size - 10} more"))
196+
}
197+
exitProcess(1)
198+
}
199+
terminal.println()
200+
terminal.println(bold(green("✅ All records successfully loaded!")))
201+
}
147202
}
148203
} catch (e: Exception) {
149204
terminal.println()
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.dragos.kafkacsvloader.cli
2+
3+
import com.dragos.kafkacsvloader.avro.AvroRecordMapper
4+
import com.dragos.kafkacsvloader.avro.AvroSchemaLoader
5+
import com.dragos.kafkacsvloader.avro.RowMappingResult
6+
import com.dragos.kafkacsvloader.csv.CsvParser
7+
import io.kotest.core.spec.style.FunSpec
8+
import io.kotest.matchers.collections.shouldBeEmpty
9+
import io.kotest.matchers.collections.shouldHaveSize
10+
import io.kotest.matchers.shouldBe
11+
import java.io.File
12+
import java.nio.file.Files
13+
14+
class DryRunTest : FunSpec({
15+
16+
test("dry run validates CSV and schema without errors for valid data") {
17+
val tempDir = Files.createTempDirectory("dry-run-test").toFile()
18+
try {
19+
// Create test CSV
20+
val csvFile = File(tempDir, "test.csv")
21+
csvFile.writeText(
22+
"""
23+
id,name,email,age
24+
1,Alice,alice@example.com,30
25+
2,Bob,bob@example.com,25
26+
""".trimIndent(),
27+
)
28+
29+
// Create test schema
30+
val schemaFile = File(tempDir, "schema.avsc")
31+
schemaFile.writeText(
32+
"""
33+
{
34+
"type": "record",
35+
"name": "User",
36+
"namespace": "com.example",
37+
"fields": [
38+
{"name": "id", "type": "string"},
39+
{"name": "name", "type": "string"},
40+
{"name": "email", "type": "string"},
41+
{"name": "age", "type": "int"}
42+
]
43+
}
44+
""".trimIndent(),
45+
)
46+
47+
// Load schema
48+
val schema = AvroSchemaLoader.loadFromFile(schemaFile.absolutePath)
49+
50+
// Parse CSV
51+
val csvData = CsvParser.parse(csvFile.absolutePath)
52+
53+
// Validate headers
54+
val schemaFields = schema.fields.map { it.name() }
55+
val headersValid = CsvParser.validateHeaders(csvData.headers, schemaFields)
56+
headersValid shouldBe true
57+
58+
// Validate all rows can be mapped (dry run logic)
59+
val failures = mutableListOf<Pair<Int, String>>()
60+
var validCount = 0
61+
62+
csvData.rows.forEachIndexed { index, row ->
63+
val rowNumber = index + 1
64+
when (val result = AvroRecordMapper.mapRow(schema, row)) {
65+
is RowMappingResult.Success -> validCount++
66+
is RowMappingResult.Failure -> {
67+
failures.add(rowNumber to result.errors.joinToString("; "))
68+
}
69+
}
70+
}
71+
72+
// Assert no failures
73+
validCount shouldBe 2
74+
failures.shouldBeEmpty()
75+
} finally {
76+
tempDir.deleteRecursively()
77+
}
78+
}
79+
80+
test("dry run detects invalid rows") {
81+
val tempDir = Files.createTempDirectory("dry-run-test").toFile()
82+
try {
83+
// Create test CSV with invalid data
84+
val csvFile = File(tempDir, "test.csv")
85+
csvFile.writeText(
86+
"""
87+
id,name,email,age
88+
1,Alice,alice@example.com,30
89+
2,Bob,bob@example.com,invalid_age
90+
""".trimIndent(),
91+
)
92+
93+
// Create test schema
94+
val schemaFile = File(tempDir, "schema.avsc")
95+
schemaFile.writeText(
96+
"""
97+
{
98+
"type": "record",
99+
"name": "User",
100+
"namespace": "com.example",
101+
"fields": [
102+
{"name": "id", "type": "string"},
103+
{"name": "name", "type": "string"},
104+
{"name": "email", "type": "string"},
105+
{"name": "age", "type": "int"}
106+
]
107+
}
108+
""".trimIndent(),
109+
)
110+
111+
// Load schema
112+
val schema = AvroSchemaLoader.loadFromFile(schemaFile.absolutePath)
113+
114+
// Parse CSV
115+
val csvData = CsvParser.parse(csvFile.absolutePath)
116+
117+
// Validate all rows (dry run logic)
118+
val failures = mutableListOf<Pair<Int, String>>()
119+
var validCount = 0
120+
121+
csvData.rows.forEachIndexed { index, row ->
122+
val rowNumber = index + 1
123+
when (val result = AvroRecordMapper.mapRow(schema, row)) {
124+
is RowMappingResult.Success -> validCount++
125+
is RowMappingResult.Failure -> {
126+
failures.add(rowNumber to result.errors.joinToString("; "))
127+
}
128+
}
129+
}
130+
131+
// Assert there's one failure
132+
validCount shouldBe 1
133+
failures shouldHaveSize 1
134+
failures[0].first shouldBe 2
135+
failures[0].second shouldBe "Field 'age' conversion error: For input string: \"invalid_age\""
136+
} finally {
137+
tempDir.deleteRecursively()
138+
}
139+
}
140+
141+
test("dry run detects missing required fields") {
142+
val tempDir = Files.createTempDirectory("dry-run-test").toFile()
143+
try {
144+
// Create test CSV missing 'age' column
145+
val csvFile = File(tempDir, "test.csv")
146+
csvFile.writeText(
147+
"""
148+
id,name,email
149+
1,Alice,alice@example.com
150+
2,Bob,bob@example.com
151+
""".trimIndent(),
152+
)
153+
154+
// Create test schema
155+
val schemaFile = File(tempDir, "schema.avsc")
156+
schemaFile.writeText(
157+
"""
158+
{
159+
"type": "record",
160+
"name": "User",
161+
"namespace": "com.example",
162+
"fields": [
163+
{"name": "id", "type": "string"},
164+
{"name": "name", "type": "string"},
165+
{"name": "email", "type": "string"},
166+
{"name": "age", "type": "int"}
167+
]
168+
}
169+
""".trimIndent(),
170+
)
171+
172+
// Load schema
173+
val schema = AvroSchemaLoader.loadFromFile(schemaFile.absolutePath)
174+
175+
// Parse CSV
176+
val csvData = CsvParser.parse(csvFile.absolutePath)
177+
178+
// Validate headers
179+
val schemaFields = schema.fields.map { it.name() }
180+
val headersValid = CsvParser.validateHeaders(csvData.headers, schemaFields)
181+
182+
// Should fail validation
183+
headersValid shouldBe false
184+
185+
// Check missing fields
186+
val missingFields = CsvParser.getMissingFields(csvData.headers, schemaFields)
187+
missingFields shouldBe listOf("age")
188+
} finally {
189+
tempDir.deleteRecursively()
190+
}
191+
}
192+
})

0 commit comments

Comments
 (0)