Skip to content

Commit ef74638

Browse files
committed
Record Router Streams actual implementation.
* Tested with minimal tests. * Needs better testing.
1 parent 7cc74ec commit ef74638

File tree

2 files changed

+39
-3
lines changed
  • code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo/recordRouter

2 files changed

+39
-3
lines changed

code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo/recordRouter/RRTesting.kt

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,29 @@ class RRTesting(
3333
}
3434

3535
private val mapper = jacksonObjectMapper()
36-
private final val consumerExecutor: ExecutorService = Executors.newFixedThreadPool(5)
36+
private final val consumerExecutor: ExecutorService = Executors.newFixedThreadPool(20)
3737

3838
fun test() {
3939

4040

4141
val systemTopic: String = "SYSTEM_TOPIC"
4242
val inputTopicA: String = "input-topic-a"
43+
val inputTopicB: String = "input-topic-b"
4344

4445
runConsumer(systemTopic)
4546
runConsumer(inputTopicA)
47+
runConsumer(inputTopicB)
4648
runConsumer("gateway-01-clients-topic")
4749
runConsumer("gateway-01-keys-topic")
50+
runConsumer("gateway-02-clients-topic")
51+
runConsumer("gateway-02-keys-topic")
4852
val props = Properties()
4953
props["bootstrap.servers"] = bootstrapServers
5054
props["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
5155
props["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
5256
val producer: KafkaProducer<String, String> = KafkaProducer<String, String>(props)
5357

54-
val newGatewayKeyTopic = SystemGatewayKeyTopic("id", "gateway-01-keys-topic", "gateway-01-clients-topic")
58+
val newGatewayKeyTopic = SystemGatewayKeyTopic("id1", "gateway-01-keys-topic", "gateway-01-clients-topic")
5559
val newGatewayKeyTopicJson: String = mapper.writeValueAsString(newGatewayKeyTopic)
5660
utils.printRed("TEST newGatewayKeyTopicJson as string: \n $newGatewayKeyTopicJson")
5761

@@ -69,8 +73,29 @@ class RRTesting(
6973
producer.send(ProducerRecord(inputTopicA, "0", "testvalue 3"))
7074
producer.send(ProducerRecord(inputTopicA, "0", "testvalue 4"))
7175

76+
Thread.sleep(6000)
7277

78+
val newGatewayKeyTopic2 = SystemGatewayKeyTopic("id2", "gateway-02-keys-topic", "gateway-02-clients-topic")
79+
val newGatewayKeyTopicJson2: String = mapper.writeValueAsString(newGatewayKeyTopic2)
80+
producer.send(ProducerRecord(systemTopic, "new-gateway-key-topic", newGatewayKeyTopicJson2))
7381

82+
Thread.sleep(1000)
83+
84+
producer.send(ProducerRecord(newGatewayKeyTopic2.keysTopicName, inputTopicA, mapper.writeValueAsString(listOf("1", "2"))))
85+
producer.send(ProducerRecord(newGatewayKeyTopic2.keysTopicName, inputTopicB, mapper.writeValueAsString(listOf("5", "6"))))
86+
87+
Thread.sleep(2000)
88+
89+
producer.send(ProducerRecord(inputTopicA, "0", "testvalue 10"))
90+
producer.send(ProducerRecord(inputTopicA, "0", "testvalue 11"))
91+
producer.send(ProducerRecord(inputTopicA, "1", "testvalue 12"))
92+
producer.send(ProducerRecord(inputTopicA, "1", "testvalue 13"))
93+
producer.send(ProducerRecord(inputTopicA, "2", "testvalue 14"))
94+
95+
producer.send(ProducerRecord(inputTopicB, "1", "no 13"))
96+
producer.send(ProducerRecord(inputTopicB, "2", "no 14"))
97+
producer.send(ProducerRecord(inputTopicB, "5", "yes 13"))
98+
producer.send(ProducerRecord(inputTopicB, "6", "yes 14"))
7499

75100
}
76101

code/kafka-streams-module-demo/src/main/kotlin/com/isel/kafkastreamsmoduledemo/recordRouter/Streams.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,22 @@ class Streams(
4242

4343
val systemTopic: String = "SYSTEM_TOPIC"
4444
val inputTopicA: String = "input-topic-a"
45+
val inputTopicB: String = "input-topic-b"
4546

4647
rrTesting.deleteTopic(systemTopic)
4748
rrTesting.deleteTopic(inputTopicA)
49+
rrTesting.deleteTopic(inputTopicB)
4850
rrTesting.deleteTopic("gateway-01-clients-topic")
4951
rrTesting.deleteTopic("gateway-01-keys-topic")
52+
rrTesting.deleteTopic("gateway-02-clients-topic")
53+
rrTesting.deleteTopic("gateway-02-keys-topic")
5054
rrTesting.createTopic(systemTopic,3,3)
5155
rrTesting.createTopic(inputTopicA,3,3)
56+
rrTesting.createTopic(inputTopicB,3,3)
5257
rrTesting.createTopic("gateway-01-clients-topic",3,3)
5358
rrTesting.createTopic("gateway-01-keys-topic",3,3)
59+
rrTesting.createTopic("gateway-02-clients-topic",3,3)
60+
rrTesting.createTopic("gateway-02-keys-topic",3,3)
5461

5562
listenSystemTopic()
5663
}
@@ -75,7 +82,7 @@ class Streams(
7582
val systemGatewayKeyTopic: SystemGatewayKeyTopic =
7683
mapper.readValue<SystemGatewayKeyTopic>(record.value())
7784
updateGatewayKeysTopicsSubscriptions(systemGatewayKeyTopic)
78-
//streamsStorage.restartAllDefaultRoutingStreams()
85+
kClientsStorage.restartAllDefaultRoutingStreams()
7986
}
8087

8188
else -> {
@@ -171,8 +178,12 @@ class Streams(
171178
consumer = createGatewayKeysConsumer()
172179
kClientsStorage.setGatewayKeysConsumer(consumer) {createGatewayKeysConsumer()}
173180
} else {
181+
recordRouterUtils.printRed("updateGatewayKeysTopicsSubscriptions before wakeup")
174182
consumer.wakeup()
183+
consumer = createGatewayKeysConsumer() // TODO: Temporary
184+
kClientsStorage.setGatewayKeysConsumer(consumer) {createGatewayKeysConsumer()} // TODO: Temporary
175185
}
186+
recordRouterUtils.printRed("updateGatewayKeysTopicsSubscriptions after consumer==null if-else")
176187

177188

178189
gatewaysDetails.compute(systemGatewayKeyTopic.gatewayId) { gatewayId, details ->

0 commit comments

Comments
 (0)