Description
Describe the bug
The Dev UI for Kafka Streams shows no / an empty topology when the source topic uses a Pattern
instead of static names.
package org.acme.consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import java.util.regex.Pattern;
@ApplicationScoped
class NotificationTopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(NotificationTopologyBuilder.class);
@Produces
public Topology topology() {
final var streamsBuilder = new StreamsBuilder();
streamsBuilder
.stream(Pattern.compile("notification\\..+"),
Consumed.with(Serdes.String(), Serdes.String()))
.foreach((key, notification) -> LOGGER.info(notification));
final Topology topology = streamsBuilder.build();
LOGGER.info("Topology: {}", topology.describe());
return topology;
}
}
Upon startup, this logs:
Topology: Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: notification\..+)
--> KSTREAM-FOREACH-0000000001
Processor: KSTREAM-FOREACH-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
Expected behavior
The Dev UI should show display the topology, even when the input topic uses a pattern.
Actual behavior
The Dev UI does not pick up the topology. It is neither available in image, nor in text format.
Changing the source topic to a static name causes the topology to be picked up correctly:
The log output in this case is:
Topology: Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [notification.repository])
--> KSTREAM-FOREACH-0000000001
Processor: KSTREAM-FOREACH-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
How to Reproduce?
- Setup a Quarkus project with
kafka-streams
extension - Add the topology as described above
- Run the Quarkus app in dev mode
- In the Dev UI, navigate to the Kafka Streams Topology section
Output of uname -a
or ver
Linux ctrl 5.15.0-53-generic #59-Ubuntu SMP Mon Oct 17 18:53:30 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux
Output of java -version
openjdk version "17.0.4.1" 2022-08-12 OpenJDK Runtime Environment Temurin-17.0.4.1+1 (build 17.0.4.1+1) OpenJDK 64-Bit Server VM Temurin-17.0.4.1+1 (build 17.0.4.1+1, mixed mode, sharing)
GraalVM version (if different from Java)
No response
Quarkus version or git rev
2.14.1.Final
Build tool (ie. output of mvnw --version
or gradlew --version
)
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Additional information
No response