-
Notifications
You must be signed in to change notification settings - Fork 18
feat : use customizer to configure deadletter config #932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request introduces modifications to the order-service module, focusing on Kafka Streams configuration, test container management, and database testing setup. The changes primarily involve refactoring the Kafka Streams error handling configuration, transforming the PostgreSQL container management approach, and updating test class annotations. The modifications aim to streamline the configuration process, improve error handling in Kafka Streams, and simplify the integration of test containers across different test classes. Changes
Sequence DiagramsequenceDiagram
participant KafkaStreamsConfig
participant StreamsBuilderFactoryBean
participant DeadLetterPublishingRecoverer
KafkaStreamsConfig->>StreamsBuilderFactoryBean: configurer(deadLetterPublishingRecoverer)
StreamsBuilderFactoryBean-->>KafkaStreamsConfig: retrieve streamsConfiguration
KafkaStreamsConfig->>KafkaStreamsConfig: Assert configuration not null
KafkaStreamsConfig->>streamsConfiguration: Set deserialization exception handler
KafkaStreamsConfig->>streamsConfiguration: Set deserialization recoverer
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Qodana Community for JVM14 new problems were found
☁️ View the detailed Qodana report Contact Qodana teamContact us at qodana-support@jetbrains.com
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (1)
16-16
: Consider using typed maps or custom config objects.
UsingProperties
is perfectly valid for Kafka Streams configurations. However, typed facilities likeMap<String, Object>
or a strongly typed configuration class can offer better type safety and readability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
(3 hunks)order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java
(1 hunks)order-service/src/test/java/com/example/orderservice/common/AbstractIntegrationTest.java
(1 hunks)order-service/src/test/java/com/example/orderservice/common/PostGreSQLContainer.java
(1 hunks)order-service/src/test/java/com/example/orderservice/repositories/OrderRepositoryTest.java
(1 hunks)
🔇 Additional comments (10)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (3)
42-42
: Import of Assert looks good.
The Assert
utility is a clean way to validate essential prerequisites for Kafka Streams properties.
57-58
: Method signature update is a neat approach.
The new signature injecting DeadLetterPublishingRecoverer
is a good way to keep the dead-letter logic explicit. Ensure your beans load in the correct order so that deadLetterPublishingRecoverer
is never null.
63-70
: Validate the dead-letter recoverer usage at runtime.
You are correctly registering the RecoveringDeserializationExceptionHandler
and specifying the KSTREAM_DESERIALIZATION_RECOVERER
. Verify that downstream components are properly set up to receive and handle these dead-letter messages.
[verify, architecture_advice]
order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java (1)
18-18
: Explicitly including containers is clear and maintainable.
By adding PostGreSQLContainer.class
to .with()
, you make the test environment setup more transparent and consistent.
order-service/src/test/java/com/example/orderservice/common/PostGreSQLContainer.java (3)
9-11
: New imports aligned with Testcontainers and Spring TestConfiguration.
Adding @TestConfiguration
and @Bean
is a straightforward way to integrate containers into the Spring testing context.
15-16
: Switching from interface to class for container config.
Annotating this class with @TestConfiguration
is an appropriate choice to manage the lifecycle of your PostgreSQL test container.
19-23
: Container bean definition verified.
Defining a PostgreSQLContainer<?>
bean with .withReuse(true)
is convenient for repeated tests, but ensure test cleanup if leftover data might bleed into subsequent test runs.
order-service/src/test/java/com/example/orderservice/common/AbstractIntegrationTest.java (1)
26-26
: Consolidating container configuration in @SpringBootTest.
Including PostGreSQLContainer.class
directly in the classes
attribute makes container setup consistent for integration tests.
order-service/src/test/java/com/example/orderservice/repositories/OrderRepositoryTest.java (2)
21-21
: Import directive for @import usage.
The import is aligned with the shift away from @ImportTestcontainers
.
29-29
: Replacing @ImportTestcontainers with @import.
This change matches the new pattern of including the PostGreSQLContainer
as a Spring bean for streamlined container management.
} | ||
|
||
@Bean | ||
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai, please help me to write Integration test to showcase deadLetterPublishingRecover
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajadilipkolli Here's a sample integration test that demonstrates how the DeadLetterPublishingRecoverer
reroutes invalid messages to a dead-letter topic. This example uses Spring’s embedded Kafka broker for local testing. Be sure to adjust package names, topics, and imports as needed in your project:
package com.example.orderservice.integration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.test.annotation.DirtiesContext;
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(
partitions = 1,
topics = {
"payment_orders_topic",
"stock_orders_topic",
"orders_topic",
"recovererDLQ"
}
)
class DeadLetterPublishingRecovererIT {
private static Consumer<Long, String> dlqConsumer;
@Autowired
private KafkaTemplate<Long, Object> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@BeforeAll
static void setup(@Autowired EmbeddedKafkaBroker broker) {
dlqConsumer = buildTestConsumer(broker);
dlqConsumer.subscribe(Collections.singletonList("recovererDLQ"));
}
@AfterAll
static void teardown() {
dlqConsumer.close();
}
@Test
void givenInvalidMessage_whenConsumed_thenShouldBeRoutedToDLQ() {
// Produce an invalid JSON or an incompatible payload to trigger a deserialization error
kafkaTemplate.send("payment_orders_topic", 1L, "invalid-payload");
kafkaTemplate.flush();
// Wait for the message to be routed to the DLQ
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
ConsumerRecords<Long, String> records = dlqConsumer.poll(Duration.ofMillis(500));
assertThat(records.count())
.as("Invalid message should be routed to recovererDLQ")
.isGreaterThan(0);
});
}
private static Consumer<Long, String> buildTestConsumer(EmbeddedKafkaBroker broker) {
var props = new java.util.HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "deadletter-test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Long, String> cf =
new DefaultKafkaConsumerFactory<>(
props,
new LongDeserializer(),
new StringDeserializer());
return cf.createConsumer();
}
}
Key points in the test:
- Embedded Kafka: The
@EmbeddedKafka
annotation spins up a local Kafka instance. - Producing Invalid Payload: Sending a payload that cannot be deserialized as a valid
OrderDto
is enough to trigger theRecoveringDeserializationExceptionHandler
. - DLQ Consumer: A separate consumer polls the
"recovererDLQ"
topic to assert that the invalid message has been successfully rerouted there by theDeadLetterPublishingRecoverer
.
This approach should help confirm that your DeadLetterPublishingRecoverer
setup is working as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai, received below error but not entering into deadLetterPublishingRecoverer
23:12:53.397 [awaitility-thread] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-deadletter-test-group-1, groupId=deadletter-test-group] Error while fetching metadata with correlation id 2 : {recovererDLQ=UNKNOWN_TOPIC_OR_PARTITION}
23:12:53.397 [awaitility-thread] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-deadletter-test-group-1, groupId=deadletter-test-group] Cluster ID: 4L6g3nShT-eMCtK--X86sw
23:12:53.398 [awaitility-thread] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-deadletter-test-group-1, groupId=deadletter-test-group] Discovered group coordinator localhost:32810 (id: 2147483646 rack: null)
23:12:53.398 [order-service-stream-client-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [order-service-stream-client] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-orders, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Can't serialize data [invalid-payload] for topic [order-service-KSTREAM-JOINTHIS-0000000004-store-changelog]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajadilipkolli, there are a couple of items that likely explain why your message never makes it to the deadLetterPublishingRecoverer
:
-
Serialization vs. Deserialization Error
The trace showsCan't serialize data [invalid-payload] for topic [order-service-KSTREAM-JOINTHIS-0000000004-store-changelog]
. TheRecoveringDeserializationExceptionHandler
only addresses deserialization exceptions (when Kafka attempts to read/parse incoming messages). Here you have a serialization error (when Kafka Streams tries to write to an internal store topic). Thus, it never invokesRecoveringDeserializationExceptionHandler
. -
Topic Existence
The warning{recovererDLQ=UNKNOWN_TOPIC_OR_PARTITION}
indicates that therecovererDLQ
topic may not exist. Make sure to create it before running or enable topic auto-creation in your test environment. Otherwise, writes to that topic will fail.
Possible Next Steps
- Handle Serialization Errors: To handle this scenario (producing messages that cannot be serialized), you need a production exception handler. Spring Kafka offers options such as a
DefaultProductionExceptionHandler
or custom logic that can be configured viaStreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG
. - Create
recovererDLQ
Topic: Ensure therecovererDLQ
topic actually exists or that your Kafka cluster is configured to allow automatic creation. Embedded Kafka for tests might require explicit topic creation.
Adjusting your code or test setup to address the above points should help you route serialization-related issues to an appropriate handler and ensure that the DLQ topic receives errored messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai, can you please help me write code which will handle above possible Next Steps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rajadilipkolli Here’s an example demonstrating how you can address serialization errors using a production exception handler and ensuring that the recovererDLQ
topic exists for your test. Feel free to adapt the package names, imports, topic names, and class names to your project structure.
package com.example.orderservice.config.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.kafka.streams.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.util.Assert;
@Configuration
@EnableKafkaStreams
@EmbeddedKafka(
partitions = 1,
topics = {
// Include any topics you need to auto-create for tests:
"payment_orders_topic",
"stock_orders_topic",
"orders_topic",
"recovererDLQ"
}
)
public class KafkaStreamsConfigWithProductionError {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfigWithProductionError.class);
/**
* Custom ProductionExceptionHandler that logs errors
* and then continues or fails based on your preference.
* The default implementation from Spring could also be used.
*/
public static class MyProductionExceptionHandler extends DefaultProductionExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(MyProductionExceptionHandler.class);
@Override
public ProductionExceptionHandlerResponse handle(
ProducerRecord<byte[], byte[]> record, Exception exception) {
// Log the serialization/production error
logger.error("Serialization or production error: topic={}, key={}, value={}",
record.topic(), record.key(), record.value(), exception);
// Decide how to proceed:
// * CONTINUE: skip the bad record and keep the Streams application running
// * FAIL: shut down the Streams application
// In a real scenario, you might produce to a separate DLQ here as well
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
}
}
/**
* Configure the StreamsBuilderFactoryBeanConfigurer to:
* 1) Use RecoveringDeserializationExceptionHandler for deserialization issues.
* 2) Use our custom ProductionExceptionHandler for serialization issues.
*/
@Bean
StreamsBuilderFactoryBeanConfigurer customStreamsConfigurer(
ProducerFactory<byte[], byte[]> producerFactory,
KafkaTemplate<byte[], byte[]> kafkaTemplate) {
return factoryBean -> {
var streamsConfig = factoryBean.getStreamsConfiguration();
Assert.notNull(streamsConfig, "streamsConfiguration must not be null");
// Deserialization issues
streamsConfig.put(
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
// Assign the custom ProductionExceptionHandler for serialization (production) errors
streamsConfig.put(
org.apache.kafka.streams.StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
MyProductionExceptionHandler.class.getName());
};
}
}
Explanation
-
Custom ProductionExceptionHandler
AMyProductionExceptionHandler
class extendsDefaultProductionExceptionHandler
. Override thehandle
method to log or reroute messages as needed. You also decide whether to continue or fail the Streams application if serialization fails. -
Wiring in Streams Configuration
- Add
RecoveringDeserializationExceptionHandler
to handle deserialization exceptions (incoming data). - Add
MyProductionExceptionHandler
(orDefaultProductionExceptionHandler
) to handle serialization exceptions (outgoing data).
- Add
-
Topic Creation for Tests
The@EmbeddedKafka(topics = {"recovererDLQ", ...})
annotation ensures thatrecovererDLQ
and other topics are automatically created before the test runs. This avoids theUNKNOWN_TOPIC_OR_PARTITION
error for the DLQ.
With this setup:
- Deserialization errors will trigger
RecoveringDeserializationExceptionHandler
, sending bad messages to"recovererDLQ"
, assuming theKSTREAM_DESERIALIZATION_RECOVERER
property is set to yourDeadLetterPublishingRecoverer
. - Serialization (production) errors in Kafka Streams will now be caught by
MyProductionExceptionHandler
, allowing you to log or reroute them as desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java (1)
9-9
: Static import wildcard caution.Using
import static com.example.orderservice.utils.AppConstants.*
can import more constants than needed. While this is convenient, it can clutter the namespace. Consider importing only what you actually use or maintain clarity with direct references toAppConstants.*
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java
(2 hunks)order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
(3 hunks)order-service/src/main/java/com/example/orderservice/utils/AppConstants.java
(1 hunks)
🔇 Additional comments (9)
order-service/src/main/java/com/example/orderservice/utils/AppConstants.java (1)
17-17
: Looks good for dedicated DLQ usage.
Adding RECOVER_DLQ_TOPIC
is straightforward and aligns well with your naming convention for constants. Ensure that any references to "DLQ" across the codebase uniformly use this constant to avoid spelling inconsistencies.
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java (2)
28-32
: Improved log statement including the new DLQ topic.
Including RECOVER_DLQ_TOPIC
in the log statement helps clarify all topics being created. Be sure to adjust any relevant logging/troubleshooting documentation to reflect the additional DLQ topic.
38-39
: Essential addition of the DLQ topic creation.
Creating the RECOVER_DLQ_TOPIC
alongside the other topics ensures the DLQ is ready for error recovery scenarios. This step significantly reduces the chance of unknown-topic errors at runtime.
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (6)
11-11
: Explicit import for the DLQ topic constant.
This import clarifies which constants are used in this file. Using the constant reference for the DLQ topic name fosters consistency.
17-17
: Properties import now used for runtime stream configuration.
Great choice for leveraging Properties
directly, allowing for more flexibility if advanced Kafka Streams parameters need dynamic configuration.
43-43
: Assert usage enhances code safety.
Adding Assert
helps catch misconfigurations early by ensuring streamsConfiguration
is non-null. This defensive programming practice is beneficial for reliability.
58-59
: Encapsulating the streams configuration logic.
By providing a StreamsBuilderFactoryBeanConfigurer
, you're centralizing your Kafka Streams properties and transitions logic. This method is well-structured and follows Spring’s recommended pattern for configuring bean properties.
64-71
: Robust error handling with RecoveringDeserializationExceptionHandler.
These lines integrate the RecoveringDeserializationExceptionHandler
and wire up a DeadLetterPublishingRecoverer
. This approach systematically handles deserialization errors by forwarding faulty messages to the configured DLQ. Confirm that any special-casing for serialization errors is also handled in your production exception handler if needed.
80-80
: Custom partition assignment for the DLQ.
Using -1
as the partition index delegates partition selection to Kafka, which is fine for most scenarios. Just ensure you don’t need a specific partition distribution logic for DLQ. Otherwise, everything looks good.
No description provided.