Kafka Streams Business Logic Error Handling and Dead Letter Queue #44134
Unanswered
bn-avasquez
asked this question in
Q&A
Replies: 2 comments 1 reply
-
/cc @alesj (kafka,kafka-streams), @cescoffier (kafka), @gunnarmorling (kafka-streams), @ozangunalp (kafka,kafka-streams), @rquinio (kafka-streams) |
Beta Was this translation helpful? Give feedback.
1 reply
-
We only support dead letter queue when using Quarkus Messaging (and it's Kafka connector). I am not aware of an equivalent feature with Kafka Streams. You may have to implement it manually. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello Quarkus community!
I'm working on a proof of concept using Quarkus and Kafka Streams. My current step in this process is implementing a dead letter queue for records that encounter errors during business logic processing in my mapValues, filter, etc chains.
When using non-streaming Kafka it's straightforward and easy to configure a dead letter queue for the above use case. https://quarkus.io/blog/kafka-failure-strategy/#the-dead-letter-topic-strategy.
Unfortunately, it appears that this dead letter topic strategy doesn't work with Kafka Streams. In addition, it appears like none of the exception handlers we can provide like DeserializationExceptionHandler and ProductionExceptionHandler are capable of catching business logic stream processing errors.
From this Confluent post, https://forum.confluent.io/t/continue-processing-on-uncaught-exceptions-in-kafka-streams/10562, it seems like these are known limitations of Kafka Streams and the suggestion is to "wrap your processing logic into try/catch blocks to detect and handle any error during processing".
Looks like there is an open request for a new feature in Kafka Streams to add a new error handler interface called ProcessingExceptionHandler.java that would work like DeserializationExceptionHandler and would satisfy my use case. https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing
Until a new processing exception handler is added to Kafka Streams and available, what is the recommended method for handling errors in your Kafka Streams business logic methods? I've not found much more than blogs and other sources saying use a try/catch block but no example or further explanation is given.
Currently, I'm wrapping my business logic in try/catch blocks in each processing method. If an error is caught when performing the processing in a mapValues, filter, etc method then I set a field on my record object called hasProcessingError. When records get to the end of processing the .to() chain uses a TopicNameExtractor instance to check field hasProcessingError and if true then the record is sent to my "dead letter queue" and otherwise the record continues on its way to downstream applications and data storage. For example, should I be setting a header or metadata and returning a Message or Record object? Is there another pattern I can follow?
If there's a doc or guide out there that discusses this topic or has a code example that someone can share I'd appreciate it. Thanks!
Beta Was this translation helpful? Give feedback.
All reactions