Skip to content

Commit 52b4489

Browse files
committed
Refactored LoadCommand.kt to reduce duplication
1 parent 9c3c682 commit 52b4489

File tree

1 file changed

+168
-128
lines changed

1 file changed

+168
-128
lines changed

src/main/kotlin/com/dragos/kafkacsvloader/cli/LoadCommand.kt

Lines changed: 168 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.dragos.kafkacsvloader
33
import com.dragos.kafkacsvloader.avro.AvroRecordMapper
44
import com.dragos.kafkacsvloader.avro.AvroSchemaLoader
55
import com.dragos.kafkacsvloader.avro.RowMappingResult
6+
import com.dragos.kafkacsvloader.csv.CsvData
67
import com.dragos.kafkacsvloader.csv.CsvParser
78
import com.dragos.kafkacsvloader.kafka.KafkaProducerClient
89
import com.github.ajalt.clikt.core.CliktCommand
@@ -17,6 +18,7 @@ import com.github.ajalt.mordant.rendering.TextColors.red
1718
import com.github.ajalt.mordant.rendering.TextColors.yellow
1819
import com.github.ajalt.mordant.rendering.TextStyles.bold
1920
import com.github.ajalt.mordant.terminal.Terminal
21+
import org.apache.avro.Schema
2022
import kotlin.system.exitProcess
2123

2224
class KafkaCsvLoaderCommand : CliktCommand(
@@ -54,160 +56,198 @@ class KafkaCsvLoaderCommand : CliktCommand(
5456
private val terminal = Terminal()
5557

5658
override fun run() {
59+
printHeader()
60+
61+
try {
62+
val schema = loadSchema()
63+
val csvData = parseCsv()
64+
validateHeaders(schema, csvData)
65+
66+
if (dryRun) {
67+
performDryRun(schema, csvData)
68+
} else {
69+
performKafkaLoad(schema, csvData)
70+
}
71+
} catch (e: Exception) {
72+
printError(e)
73+
exitProcess(1)
74+
}
75+
}
76+
77+
private fun printHeader() {
5778
terminal.println(bold(cyan("🚀 Kafka CSV Loader")))
5879
if (dryRun) {
5980
terminal.println(yellow(" DRY RUN MODE - No data will be sent to Kafka"))
6081
}
6182
terminal.println()
83+
}
6284

63-
try {
64-
// Step 1: Load schema
65-
terminal.print(yellow("📋 Loading Avro schema... "))
66-
val schema = AvroSchemaLoader.loadFromFile(schemaFile)
67-
terminal.println(green(""))
68-
terminal.println(" Schema: ${schema.namespace}.${schema.name}")
69-
terminal.println(" Fields: ${schema.fields.joinToString(", ") { it.name() }}")
70-
terminal.println()
85+
private fun loadSchema(): Schema {
86+
terminal.print(yellow("📋 Loading Avro schema... "))
87+
val schema = AvroSchemaLoader.loadFromFile(schemaFile)
88+
terminal.println(green(""))
89+
terminal.println(" Schema: ${schema.namespace}.${schema.name}")
90+
terminal.println(" Fields: ${schema.fields.joinToString(", ") { it.name() }}")
91+
terminal.println()
92+
return schema
93+
}
7194

72-
// Step 2: Parse CSV
73-
terminal.print(yellow("📄 Parsing CSV file... "))
74-
val csvData = CsvParser.parse(csvFile)
75-
terminal.println(green(""))
76-
terminal.println(" Headers: ${csvData.headers.joinToString(", ")}")
77-
terminal.println(" Rows: ${csvData.rows.size}")
78-
terminal.println()
95+
private fun parseCsv(): CsvData {
96+
terminal.print(yellow("📄 Parsing CSV file... "))
97+
val csvData = CsvParser.parse(csvFile)
98+
terminal.println(green(""))
99+
terminal.println(" Headers: ${csvData.headers.joinToString(", ")}")
100+
terminal.println(" Rows: ${csvData.rows.size}")
101+
terminal.println()
102+
return csvData
103+
}
79104

80-
// Step 3: Validate headers
81-
terminal.print(yellow("🔍 Validating CSV headers against schema... "))
82-
val schemaFields = schema.fields.map { it.name() }
83-
if (!CsvParser.validateHeaders(csvData.headers, schemaFields)) {
84-
val missing = CsvParser.getMissingFields(csvData.headers, schemaFields)
85-
terminal.println(red(""))
86-
terminal.println(red(" Missing required fields: ${missing.joinToString(", ")}"))
87-
exitProcess(1)
88-
}
89-
terminal.println(green(""))
90-
terminal.println()
105+
private fun validateHeaders(
106+
schema: Schema,
107+
csvData: CsvData,
108+
) {
109+
terminal.print(yellow("🔍 Validating CSV headers against schema... "))
110+
val schemaFields = schema.fields.map { it.name() }
111+
if (!CsvParser.validateHeaders(csvData.headers, schemaFields)) {
112+
val missing = CsvParser.getMissingFields(csvData.headers, schemaFields)
113+
terminal.println(red(""))
114+
terminal.println(red(" Missing required fields: ${missing.joinToString(", ")}"))
115+
exitProcess(1)
116+
}
117+
terminal.println(green(""))
118+
terminal.println()
119+
}
91120

92-
if (dryRun) {
93-
// Dry run: validate all rows can be mapped
94-
terminal.println(yellow("🔍 Validating all rows (dry run)..."))
95-
terminal.println()
121+
private fun performDryRun(
122+
schema: Schema,
123+
csvData: CsvData,
124+
) {
125+
terminal.println(yellow("🔍 Validating all rows (dry run)..."))
126+
terminal.println()
96127

97-
var validCount = 0
98-
var invalidCount = 0
99-
val failures = mutableListOf<Pair<Int, String>>()
128+
var validCount = 0
129+
val failures = mutableListOf<Pair<Int, String>>()
100130

101-
csvData.rows.forEachIndexed { index, row ->
102-
val rowNumber = index + 1
103-
val result = AvroRecordMapper.mapRow(schema, row)
131+
csvData.rows.forEachIndexed { index, row ->
132+
val rowNumber = index + 1
133+
val result = AvroRecordMapper.mapRow(schema, row)
104134

105-
when (result) {
106-
is RowMappingResult.Success -> {
107-
validCount++
108-
if (rowNumber % 50 == 0) {
109-
terminal.println(green(" ✓ Validated $rowNumber rows..."))
110-
}
111-
}
112-
is RowMappingResult.Failure -> {
113-
invalidCount++
114-
failures.add(rowNumber to result.errors.joinToString("; "))
115-
}
135+
when (result) {
136+
is RowMappingResult.Success -> {
137+
validCount++
138+
if (rowNumber % 50 == 0) {
139+
terminal.println(green(" ✓ Validated $rowNumber rows..."))
116140
}
117141
}
142+
is RowMappingResult.Failure -> {
143+
failures.add(rowNumber to result.errors.joinToString("; "))
144+
}
145+
}
146+
}
118147

119-
terminal.println()
120-
terminal.println(bold(cyan("📊 Dry Run Summary")))
121-
terminal.println(green(" ✓ Valid rows: $validCount"))
122-
terminal.println(red(" ✗ Invalid rows: $invalidCount"))
148+
terminal.println()
149+
terminal.println(bold(cyan("📊 Dry Run Summary")))
150+
terminal.println(green(" ✓ Valid rows: $validCount"))
151+
terminal.println(red(" ✗ Invalid rows: ${failures.size}"))
123152

124-
if (failures.isNotEmpty()) {
125-
terminal.println()
126-
terminal.println(yellow(" Invalid rows:"))
127-
failures.take(10).forEach { (rowNum, error) ->
128-
terminal.println(red(" Row $rowNum: $error"))
129-
}
130-
if (failures.size > 10) {
131-
terminal.println(yellow(" ... and ${failures.size - 10} more"))
132-
}
133-
exitProcess(1)
134-
}
153+
if (failures.isNotEmpty()) {
154+
printFailures(failures)
155+
exitProcess(1)
156+
}
135157

136-
terminal.println()
137-
terminal.println(bold(green("✅ All rows validated successfully! Ready to load to Kafka.")))
138-
} else {
139-
// Normal run: connect to Kafka and send
140-
terminal.print(yellow("🔌 Connecting to Kafka... "))
141-
terminal.println()
142-
terminal.println(" Bootstrap servers: $bootstrapServers")
143-
terminal.println(" Schema Registry: $schemaRegistry")
144-
terminal.println(" Topic: $topic")
145-
terminal.println()
146-
147-
KafkaProducerClient(bootstrapServers, schemaRegistry).use { producer ->
148-
terminal.println(yellow("📤 Sending records to Kafka..."))
149-
terminal.println()
150-
151-
var successCount = 0
152-
var failureCount = 0
153-
val failures = mutableListOf<Pair<Int, String>>()
154-
155-
csvData.rows.forEachIndexed { index, row ->
156-
val rowNumber = index + 1
157-
val result = AvroRecordMapper.mapRow(schema, row)
158-
159-
when (result) {
160-
is RowMappingResult.Success -> {
161-
val key = keyField?.let { row[it] }
162-
try {
163-
producer.sendSync(topic, key, result.record)
164-
successCount++
165-
terminal.print(green(""))
166-
if (rowNumber % 50 == 0) {
167-
terminal.println(" $rowNumber")
168-
}
169-
} catch (e: Exception) {
170-
failureCount++
171-
failures.add(rowNumber to "Kafka error: ${e.message}")
172-
terminal.print(red(""))
173-
}
174-
}
175-
is RowMappingResult.Failure -> {
176-
failureCount++
177-
failures.add(rowNumber to result.errors.joinToString("; "))
178-
terminal.print(red(""))
158+
terminal.println()
159+
terminal.println(bold(green("✅ All rows validated successfully! Ready to load to Kafka.")))
160+
}
161+
162+
private fun performKafkaLoad(
163+
schema: Schema,
164+
csvData: CsvData,
165+
) {
166+
printKafkaConnection()
167+
168+
KafkaProducerClient(bootstrapServers, schemaRegistry).use { producer ->
169+
terminal.println(yellow("📤 Sending records to Kafka..."))
170+
terminal.println()
171+
172+
var successCount = 0
173+
val failures = mutableListOf<Pair<Int, String>>()
174+
175+
csvData.rows.forEachIndexed { index, row ->
176+
val rowNumber = index + 1
177+
val result = AvroRecordMapper.mapRow(schema, row)
178+
179+
when (result) {
180+
is RowMappingResult.Success -> {
181+
val key = keyField?.let { row[it] }
182+
try {
183+
producer.sendSync(topic, key, result.record)
184+
successCount++
185+
terminal.print(green(""))
186+
if (rowNumber % 50 == 0) {
187+
terminal.println(" $rowNumber")
179188
}
189+
} catch (e: Exception) {
190+
failures.add(rowNumber to "Kafka error: ${e.message}")
191+
terminal.print(red(""))
180192
}
181193
}
182-
terminal.println()
183-
terminal.println()
184-
185-
terminal.println(bold(cyan("📊 Summary")))
186-
terminal.println(green(" ✓ Success: $successCount"))
187-
if (failureCount > 0) {
188-
terminal.println(red(" ✗ Failures: $failureCount"))
189-
terminal.println()
190-
terminal.println(yellow(" Failed rows:"))
191-
failures.take(10).forEach { (rowNum, error) ->
192-
terminal.println(red(" Row $rowNum: $error"))
193-
}
194-
if (failures.size > 10) {
195-
terminal.println(yellow(" ... and ${failures.size - 10} more"))
196-
}
197-
exitProcess(1)
194+
is RowMappingResult.Failure -> {
195+
failures.add(rowNumber to result.errors.joinToString("; "))
196+
terminal.print(red(""))
198197
}
199-
terminal.println()
200-
terminal.println(bold(green("✅ All records successfully loaded!")))
201198
}
202199
}
203-
} catch (e: Exception) {
200+
204201
terminal.println()
205-
terminal.println(red(bold("❌ Error: ${e.message}")))
206-
if (e.cause != null) {
207-
terminal.println(red(" Caused by: ${e.cause?.message}"))
208-
}
202+
terminal.println()
203+
204+
printKafkaSummary(successCount, failures)
205+
}
206+
}
207+
208+
private fun printKafkaConnection() {
209+
terminal.print(yellow("🔌 Connecting to Kafka... "))
210+
terminal.println()
211+
terminal.println(" Bootstrap servers: $bootstrapServers")
212+
terminal.println(" Schema Registry: $schemaRegistry")
213+
terminal.println(" Topic: $topic")
214+
terminal.println()
215+
}
216+
217+
private fun printKafkaSummary(
218+
successCount: Int,
219+
failures: List<Pair<Int, String>>,
220+
) {
221+
terminal.println(bold(cyan("📊 Summary")))
222+
terminal.println(green(" ✓ Success: $successCount"))
223+
224+
if (failures.isNotEmpty()) {
225+
terminal.println(red(" ✗ Failures: ${failures.size}"))
226+
printFailures(failures)
209227
exitProcess(1)
210228
}
229+
230+
terminal.println()
231+
terminal.println(bold(green("✅ All records successfully loaded!")))
232+
}
233+
234+
private fun printFailures(failures: List<Pair<Int, String>>) {
235+
terminal.println()
236+
terminal.println(yellow(" Invalid rows:"))
237+
failures.take(10).forEach { (rowNum, error) ->
238+
terminal.println(red(" Row $rowNum: $error"))
239+
}
240+
if (failures.size > 10) {
241+
terminal.println(yellow(" ... and ${failures.size - 10} more"))
242+
}
243+
}
244+
245+
private fun printError(e: Exception) {
246+
terminal.println()
247+
terminal.println(red(bold("❌ Error: ${e.message}")))
248+
if (e.cause != null) {
249+
terminal.println(red(" Caused by: ${e.cause?.message}"))
250+
}
211251
}
212252

213253
private fun getVersion(): String {

0 commit comments

Comments
 (0)