@@ -2,18 +2,18 @@ package com.dragos.kafkacsvloader.integration
22
33import com.dragos.kafkacsvloader.avro.AvroRecordMapper
44import com.dragos.kafkacsvloader.avro.AvroSchemaLoader
5+ import com.dragos.kafkacsvloader.avro.RowMappingResult
56import com.dragos.kafkacsvloader.csv.CsvParser
67import com.dragos.kafkacsvloader.kafka.KafkaProducerClient
78import io.confluent.kafka.serializers.KafkaAvroDeserializer
89import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
910import io.kotest.matchers.shouldBe
11+ import io.kotest.matchers.shouldNotBe
1012import org.apache.avro.generic.GenericRecord
1113import org.apache.kafka.clients.consumer.ConsumerConfig
1214import org.apache.kafka.clients.consumer.KafkaConsumer
1315import org.apache.kafka.common.serialization.StringDeserializer
14- import org.junit.jupiter.api.AfterAll
15- import org.junit.jupiter.api.BeforeAll
16- import org.junit.jupiter.api.Test
16+ import org.junit.jupiter.api.*
1717import org.junit.jupiter.api.io.TempDir
1818import org.testcontainers.containers.GenericContainer
1919import org.testcontainers.containers.KafkaContainer
@@ -23,57 +23,66 @@ import java.io.File
2323import java.time.Duration
2424import java.util.Properties
2525
26+ @TestInstance(TestInstance .Lifecycle .PER_CLASS )
2627class KafkaIntegrationTest {
2728
28- companion object {
29- private lateinit var network: Network
30- private lateinit var kafka: KafkaContainer
31- private lateinit var schemaRegistry: GenericContainer <* >
29+ private lateinit var network: Network
30+ private lateinit var kafka: KafkaContainer
31+ private lateinit var schemaRegistry: GenericContainer <* >
32+
33+ private lateinit var bootstrapServers: String
34+ private lateinit var schemaRegistryUrl: String
35+
36+ @BeforeAll
37+ fun setup () {
38+ println (" Starting Testcontainers setup..." )
3239
33- private lateinit var bootstrapServers : String
34- private lateinit var schemaRegistryUrl : String
35-
36- @JvmStatic
37- @BeforeAll
38- fun setup () {
39- network = Network .newNetwork( )
40-
41- // Start Kafka
42- kafka = KafkaContainer ( DockerImageName .parse( " confluentinc/cp-kafka:7.5.3 " ) )
43- .withNetwork(network)
44- .withNetworkAliases( " kafka " )
45- kafka.start( )
46-
47- // Start Schema Registry
48- schemaRegistry = GenericContainer ( DockerImageName .parse( " confluentinc/cp- schema-registry:7.5.3 " ) )
49- .withNetwork(network )
50- .withExposedPorts( 8081 )
51- .withEnv( " SCHEMA_REGISTRY_HOST_NAME " , " schema-registry " )
52- .withEnv( " SCHEMA_REGISTRY_LISTENERS " , " http://0.0.0.0:8081 " )
53- .withEnv( " SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS " , " PLAINTEXT://kafka:9092 " )
54- schemaRegistry.start()
55-
56- bootstrapServers = kafka.bootstrapServers
57- schemaRegistryUrl = " http:// " + schemaRegistry.host + " : " + schemaRegistry.getMappedPort( 8081 ) + " "
58-
59- println (" Kafka started at : $bootstrapServers " )
60- println (" Schema Registry started at : $schemaRegistryUrl " )
61- }
40+ network = Network .newNetwork()
41+ println ( " ✓ Network created " )
42+
43+ // Start Kafka
44+ kafka = KafkaContainer ( DockerImageName .parse( " confluentinc/cp-kafka:7.5.3 " ))
45+ .withNetwork(network)
46+ .withNetworkAliases( " kafka " )
47+ println ( " Starting Kafka container... " )
48+ kafka.start()
49+ println ( " ✓ Kafka started " )
50+
51+ // Start Schema Registry
52+ schemaRegistry = GenericContainer ( DockerImageName .parse( " confluentinc/cp-schema-registry:7.5.3 " ) )
53+ .withNetwork(network)
54+ .withExposedPorts( 8081 )
55+ .withEnv( " SCHEMA_REGISTRY_HOST_NAME " , " schema-registry" )
56+ .withEnv( " SCHEMA_REGISTRY_LISTENERS " , " http://0.0.0.0:8081 " )
57+ .withEnv( " SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS " , " PLAINTEXT://kafka:9092 " )
58+ println ( " Starting Schema Registry container... " )
59+ schemaRegistry.start( )
60+ println ( " ✓ Schema Registry started " )
61+
62+ bootstrapServers = kafka.bootstrapServers
63+ schemaRegistryUrl = " http:// ${schemaRegistry.host} : ${schemaRegistry.getMappedPort( 8081 )} "
64+
65+ println ( " ✓✓ All containers ready! " )
66+ println (" Kafka : $bootstrapServers " )
67+ println (" Schema Registry : $schemaRegistryUrl " )
68+ }
6269
63- @JvmStatic
64- @AfterAll
65- fun teardown () {
66- schemaRegistry.stop()
67- kafka.stop()
68- network.close()
69- }
70+ @AfterAll
71+ fun teardown () {
72+ println ( " Stopping containers... " )
73+ schemaRegistry.stop()
74+ kafka.stop()
75+ network.close()
76+ println ( " ✓ Cleanup complete " )
7077 }
7178
7279 @TempDir
7380 lateinit var tempDir: File
7481
7582 @Test
7683 fun `should load CSV data into Kafka with Avro schema end-to-end` () {
84+ println (" \n === Running end-to-end test ===" )
85+
7786 // Given: Create test schema
7887 val schemaContent = """
7988 {
@@ -93,6 +102,7 @@ class KafkaIntegrationTest {
93102 writeText(schemaContent)
94103 }
95104 val schema = AvroSchemaLoader .loadFromFile(schemaFile.absolutePath)
105+ println (" ✓ Schema loaded" )
96106
97107 // Given: Create test CSV
98108 val csvContent = """
@@ -105,22 +115,27 @@ class KafkaIntegrationTest {
105115 writeText(csvContent)
106116 }
107117 val csvData = CsvParser .parse(csvFile.absolutePath)
118+ println (" ✓ CSV parsed: ${csvData.rows.size} rows" )
108119
109120 // Given: Kafka topic
110- val topic = " test-users-" + System .currentTimeMillis()
121+ val topic = " test-users-${System .currentTimeMillis()} "
122+ println (" ✓ Topic: $topic " )
111123
112124 // When: Send data to Kafka
125+ println (" Sending records to Kafka..." )
113126 KafkaProducerClient (bootstrapServers, schemaRegistryUrl).use { producer ->
114127 csvData.rows.forEach { row ->
115128 val result = AvroRecordMapper .mapRow(schema, row)
116- if (result is com.dragos.kafkacsvloader.avro. RowMappingResult .Success ) {
129+ if (result is RowMappingResult .Success ) {
117130 val key = row[" id" ]
118131 producer.sendSync(topic, key, result.record)
132+ println (" ✓ Sent record with key: $key " )
119133 }
120134 }
121135 }
122136
123137 // Then: Consume and verify
138+ println (" Consuming records from Kafka..." )
124139 val consumer = createConsumer()
125140 consumer.subscribe(listOf (topic))
126141
@@ -132,41 +147,53 @@ class KafkaIntegrationTest {
132147 val polled = consumer.poll(Duration .ofSeconds(2 ))
133148 polled.forEach { record ->
134149 records.add(record.value() as GenericRecord )
150+ println (" ✓ Consumed record: ${record.value()} " )
135151 }
136152 }
137153
138154 consumer.close()
139155
140156 // Verify we received all 3 records
157+ println (" Verifying ${records.size} records..." )
141158 records.size shouldBe 3
142159
160+ // Helper function to convert Avro Utf8 to String
161+ fun GenericRecord.getString (field : String ): String = this .get(field).toString()
162+
143163 // Verify first record
144- val alice = records.find { ( it.get (" name" ) as String ) == " Alice" }
145- alice shouldBe org.junit.jupiter.api. Assertions .assertNotNull(alice)
164+ val alice = records.find { it.getString (" name" ) == " Alice" }
165+ alice shouldNotBe null
146166 alice?.get(" id" ) shouldBe 1
147- alice?.get (" email" ) shouldBe " alice@example.com"
167+ alice?.getString (" email" ) shouldBe " alice@example.com"
148168 alice?.get(" age" ) shouldBe 30
149169 alice?.get(" active" ) shouldBe true
170+ println (" ✓ Alice verified" )
150171
151172 // Verify second record
152- val bob = records.find { ( it.get (" name" ) as String ) == " Bob" }
153- bob shouldBe org.junit.jupiter.api. Assertions .assertNotNull(bob)
173+ val bob = records.find { it.getString (" name" ) == " Bob" }
174+ bob shouldNotBe null
154175 bob?.get(" id" ) shouldBe 2
155- bob?.get (" email" ) shouldBe " bob@example.com"
176+ bob?.getString (" email" ) shouldBe " bob@example.com"
156177 bob?.get(" age" ) shouldBe 25
157178 bob?.get(" active" ) shouldBe false
179+ println (" ✓ Bob verified" )
158180
159181 // Verify third record
160- val charlie = records.find { ( it.get (" name" ) as String ) == " Charlie" }
161- charlie shouldBe org.junit.jupiter.api. Assertions .assertNotNull(charlie)
182+ val charlie = records.find { it.getString (" name" ) == " Charlie" }
183+ charlie shouldNotBe null
162184 charlie?.get(" id" ) shouldBe 3
163- charlie?.get (" email" ) shouldBe " charlie@example.com"
185+ charlie?.getString (" email" ) shouldBe " charlie@example.com"
164186 charlie?.get(" age" ) shouldBe 35
165187 charlie?.get(" active" ) shouldBe true
188+ println (" ✓ Charlie verified" )
189+
190+ println (" === End-to-end test PASSED ===\n " )
166191 }
167192
168193 @Test
169194 fun `should handle validation errors gracefully` () {
195+ println (" \n === Running validation error test ===" )
196+
170197 // Given: Create test schema
171198 val schemaContent = """
172199 {
@@ -198,16 +225,15 @@ class KafkaIntegrationTest {
198225 val result = AvroRecordMapper .mapRow(schema, csvData.rows.first())
199226
200227 // Then: Should fail with validation error
201- result shouldBe org.junit.jupiter.api.Assertions .assertInstanceOf(
202- com.dragos.kafkacsvloader.avro.RowMappingResult .Failure ::class .java,
203- result
204- )
228+ Assertions .assertTrue(result is RowMappingResult .Failure )
229+ println (" ✓ Validation error handled correctly" )
230+ println (" === Validation error test PASSED ===\n " )
205231 }
206232
207233 private fun createConsumer (): KafkaConsumer <String , GenericRecord > {
208234 val props = Properties ().apply {
209235 put(ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers)
210- put(ConsumerConfig .GROUP_ID_CONFIG , " test-consumer-" + System .currentTimeMillis())
236+ put(ConsumerConfig .GROUP_ID_CONFIG , " test-consumer-${ System .currentTimeMillis()} " )
211237 put(ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , " earliest" )
212238 put(ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer ::class .java.name)
213239 put(ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , KafkaAvroDeserializer ::class .java.name)
0 commit comments