-
Notifications
You must be signed in to change notification settings - Fork 631
Closed
Description
Apologies if this isn't the right forum to post this, as I'm not sure if it's a bug or (more likely) something I'm doing wrong. However I tried posting a question on StackOverflow and had no luck.
I have two methods annotated with @StreamListener. One takes in two KTables, and the other takes in a KStream. The Spring Boot app starts correctly, but after a few seconds this error is shown:
Exception in thread "spring-cloud-stream-kafka-12427e22-82cc-4368-b07c-09573ada6670-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t3-0 for non-subscribed topic regex pattern; subscription pattern is t1|t2
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
The code is as follows (in Kotlin):
@SpringBootApplication
@EnableBinding(T1T2Binding::class, T3Binding::class)
class Application {
@StreamListener
fun t1Handler(@Input("t1") t1: KTable<String, String>, @Input("t2") t2: KTable<String, String>) {
}
@StreamListener
fun t2Handler(@Input("t3") t3: KStream<String, String>) {
t3.map { key, value ->
KeyValue(key, value)
}
}
}
interface T1T2Binding {
@Input
fun t1(): KTable<String, String>
@Input
fun t2(): KTable<String, String>
}
interface T3Binding {
@Input
fun t3(): KStream<String, String>
}
And the application.yml file is:
spring.cloud.stream:
bindings:
t1:
destination: t1
consumer:
headerMode: raw
t2:
destination: t2
consumer:
headerMode: raw
t3:
destination: t3
consumer:
headerMode: raw
kafka.streams:
binder:
brokers: localhost
zk-nodes: localhost
configuration.commit.interval.ms: 1000
application-id: "spring-cloud-stream-kafka"
A github repo where this problem can be replicated is available here.
Thanks in advance for any pointers on this.
Metadata
Metadata
Assignees
Labels
No labels