Skip to content

Conversation

rajadilipkolli
Copy link
Owner

No description provided.

@rajadilipkolli rajadilipkolli self-assigned this Dec 30, 2024
Copy link
Contributor

coderabbitai bot commented Dec 30, 2024

Walkthrough

The pull request introduces modifications to the order-service module, focusing on Kafka Streams configuration, test container management, and database testing setup. The changes primarily involve refactoring the Kafka Streams error handling configuration, transforming the PostgreSQL container management approach, and updating test class annotations. The modifications aim to streamline the configuration process, improve error handling in Kafka Streams, and simplify the integration of test containers across different test classes.

Changes

File Change Summary
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java - Removed defaultKafkaStreamsConfig method
- Updated configurer method to accept DeadLetterPublishingRecoverer
- Enhanced error handling configuration
order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java - Removed @ImportTestcontainers annotation
- Added PostGreSQLContainer.class directly to SpringApplication.from() method
order-service/src/test/java/com/example/orderservice/common/AbstractIntegrationTest.java - Removed @ImportTestcontainers annotation
- Updated @SpringBootTest to include PostGreSQLContainer.class
order-service/src/test/java/com/example/orderservice/common/PostGreSQLContainer.java - Converted from interface to class
- Added @TestConfiguration annotation
- Introduced @Bean method for container creation
order-service/src/test/java/com/example/orderservice/repositories/OrderRepositoryTest.java - Replaced @ImportTestcontainers with @Import
- Removed @AutoConfigureTestDatabase annotation
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java - Added new topic RECOVER_DLQ_TOPIC
- Simplified import statements for constants
- Updated logging statement for new topic
order-service/src/main/java/com/example/orderservice/utils/AppConstants.java - Added constant RECOVER_DLQ_TOPIC with value "recovererDLQ"

Sequence Diagram

sequenceDiagram
    participant KafkaStreamsConfig
    participant StreamsBuilderFactoryBean
    participant DeadLetterPublishingRecoverer
    
    KafkaStreamsConfig->>StreamsBuilderFactoryBean: configurer(deadLetterPublishingRecoverer)
    StreamsBuilderFactoryBean-->>KafkaStreamsConfig: retrieve streamsConfiguration
    KafkaStreamsConfig->>KafkaStreamsConfig: Assert configuration not null
    KafkaStreamsConfig->>streamsConfiguration: Set deserialization exception handler
    KafkaStreamsConfig->>streamsConfiguration: Set deserialization recoverer
Loading

Poem

🐰 Hop, hop, config's got a new beat,
Kafka Streams now dance more neat!
Containers spring to life with glee,
Test magic flows so easily
A rabbit's code, both swift and bright! 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

github-actions bot commented Dec 30, 2024

Qodana Community for JVM

14 new problems were found

Inspection name Severity Problems
@NotNull/@Nullable problems 🔶 Warning 6
Unchecked warning 🔶 Warning 2
Method can be extracted ◽️ Notice 4
Non-distinguishable logging calls ◽️ Notice 2

☁️ View the detailed Qodana report

Contact Qodana team

Contact us at qodana-support@jetbrains.com

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (1)

16-16: Consider using typed maps or custom config objects.
Using Properties is perfectly valid for Kafka Streams configurations. However, typed facilities like Map<String, Object> or a strongly typed configuration class can offer better type safety and readability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a695350 and 6bd482c.

📒 Files selected for processing (5)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (3 hunks)
  • order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java (1 hunks)
  • order-service/src/test/java/com/example/orderservice/common/AbstractIntegrationTest.java (1 hunks)
  • order-service/src/test/java/com/example/orderservice/common/PostGreSQLContainer.java (1 hunks)
  • order-service/src/test/java/com/example/orderservice/repositories/OrderRepositoryTest.java (1 hunks)
🔇 Additional comments (10)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (3)

42-42: Import of Assert looks good.
The Assert utility is a clean way to validate essential prerequisites for Kafka Streams properties.


57-58: Method signature update is a neat approach.
The new signature injecting DeadLetterPublishingRecoverer is a good way to keep the dead-letter logic explicit. Ensure your beans load in the correct order so that deadLetterPublishingRecoverer is never null.


63-70: Validate the dead-letter recoverer usage at runtime.
You are correctly registering the RecoveringDeserializationExceptionHandler and specifying the KSTREAM_DESERIALIZATION_RECOVERER. Verify that downstream components are properly set up to receive and handle these dead-letter messages.
[verify, architecture_advice]

order-service/src/test/java/com/example/orderservice/TestOrderServiceApplication.java (1)

18-18: Explicitly including containers is clear and maintainable.
By adding PostGreSQLContainer.class to .with(), you make the test environment setup more transparent and consistent.

order-service/src/test/java/com/example/orderservice/common/PostGreSQLContainer.java (3)

9-11: New imports aligned with Testcontainers and Spring TestConfiguration.
Adding @TestConfiguration and @Bean is a straightforward way to integrate containers into the Spring testing context.


15-16: Switching from interface to class for container config.
Annotating this class with @TestConfiguration is an appropriate choice to manage the lifecycle of your PostgreSQL test container.


19-23: Container bean definition verified.
Defining a PostgreSQLContainer<?> bean with .withReuse(true) is convenient for repeated tests, but ensure test cleanup if leftover data might bleed into subsequent test runs.

order-service/src/test/java/com/example/orderservice/common/AbstractIntegrationTest.java (1)

26-26: Consolidating container configuration in @SpringBootTest.
Including PostGreSQLContainer.class directly in the classes attribute makes container setup consistent for integration tests.

order-service/src/test/java/com/example/orderservice/repositories/OrderRepositoryTest.java (2)

21-21: Import directive for @import usage.
The import is aligned with the shift away from @ImportTestcontainers.


29-29: Replacing @ImportTestcontainers with @import.
This change matches the new pattern of including the PostGreSQLContainer as a Spring bean for streamlined container management.

}

@Bean
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, please help me to write Integration test to showcase deadLetterPublishingRecover

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli Here's a sample integration test that demonstrates how the DeadLetterPublishingRecoverer reroutes invalid messages to a dead-letter topic. This example uses Spring’s embedded Kafka broker for local testing. Be sure to adjust package names, topics, and imports as needed in your project:

package com.example.orderservice.integration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.test.annotation.DirtiesContext;

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(
    partitions = 1,
    topics = {
        "payment_orders_topic",
        "stock_orders_topic",
        "orders_topic",
        "recovererDLQ"
    }
)
class DeadLetterPublishingRecovererIT {

    private static Consumer<Long, String> dlqConsumer;

    @Autowired
    private KafkaTemplate<Long, Object> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @BeforeAll
    static void setup(@Autowired EmbeddedKafkaBroker broker) {
        dlqConsumer = buildTestConsumer(broker);
        dlqConsumer.subscribe(Collections.singletonList("recovererDLQ"));
    }

    @AfterAll
    static void teardown() {
        dlqConsumer.close();
    }

    @Test
    void givenInvalidMessage_whenConsumed_thenShouldBeRoutedToDLQ() {
        // Produce an invalid JSON or an incompatible payload to trigger a deserialization error
        kafkaTemplate.send("payment_orders_topic", 1L, "invalid-payload");
        kafkaTemplate.flush();

        // Wait for the message to be routed to the DLQ
        await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
            ConsumerRecords<Long, String> records = dlqConsumer.poll(Duration.ofMillis(500));
            assertThat(records.count())
                .as("Invalid message should be routed to recovererDLQ")
                .isGreaterThan(0);
        });
    }

    private static Consumer<Long, String> buildTestConsumer(EmbeddedKafkaBroker broker) {
        var props = new java.util.HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "deadletter-test-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Long, String> cf =
            new DefaultKafkaConsumerFactory<>(
                props,
                new LongDeserializer(),
                new StringDeserializer());
        return cf.createConsumer();
    }
}

Key points in the test:

  1. Embedded Kafka: The @EmbeddedKafka annotation spins up a local Kafka instance.
  2. Producing Invalid Payload: Sending a payload that cannot be deserialized as a valid OrderDto is enough to trigger the RecoveringDeserializationExceptionHandler.
  3. DLQ Consumer: A separate consumer polls the "recovererDLQ" topic to assert that the invalid message has been successfully rerouted there by the DeadLetterPublishingRecoverer.

This approach should help confirm that your DeadLetterPublishingRecoverer setup is working as expected.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, received below error but not entering into deadLetterPublishingRecoverer

23:12:53.397 [awaitility-thread] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-deadletter-test-group-1, groupId=deadletter-test-group] Error while fetching metadata with correlation id 2 : {recovererDLQ=UNKNOWN_TOPIC_OR_PARTITION}
23:12:53.397 [awaitility-thread] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-deadletter-test-group-1, groupId=deadletter-test-group] Cluster ID: 4L6g3nShT-eMCtK--X86sw
23:12:53.398 [awaitility-thread] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-deadletter-test-group-1, groupId=deadletter-test-group] Discovered group coordinator localhost:32810 (id: 2147483646 rack: null)
23:12:53.398 [order-service-stream-client-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [order-service-stream-client] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-orders, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Can't serialize data [invalid-payload] for topic [order-service-KSTREAM-JOINTHIS-0000000004-store-changelog]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli, there are a couple of items that likely explain why your message never makes it to the deadLetterPublishingRecoverer:

  1. Serialization vs. Deserialization Error
    The trace shows Can't serialize data [invalid-payload] for topic [order-service-KSTREAM-JOINTHIS-0000000004-store-changelog]. The RecoveringDeserializationExceptionHandler only addresses deserialization exceptions (when Kafka attempts to read/parse incoming messages). Here you have a serialization error (when Kafka Streams tries to write to an internal store topic). Thus, it never invokes RecoveringDeserializationExceptionHandler.

  2. Topic Existence
    The warning {recovererDLQ=UNKNOWN_TOPIC_OR_PARTITION} indicates that the recovererDLQ topic may not exist. Make sure to create it before running or enable topic auto-creation in your test environment. Otherwise, writes to that topic will fail.

Possible Next Steps

  • Handle Serialization Errors: To handle this scenario (producing messages that cannot be serialized), you need a production exception handler. Spring Kafka offers options such as a DefaultProductionExceptionHandler or custom logic that can be configured via StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG.
  • Create recovererDLQ Topic: Ensure the recovererDLQ topic actually exists or that your Kafka cluster is configured to allow automatic creation. Embedded Kafka for tests might require explicit topic creation.

Adjusting your code or test setup to address the above points should help you route serialization-related issues to an appropriate handler and ensure that the DLQ topic receives errored messages.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, can you please help me write code which will handle above possible Next Steps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajadilipkolli Here’s an example demonstrating how you can address serialization errors using a production exception handler and ensuring that the recovererDLQ topic exists for your test. Feel free to adapt the package names, imports, topic names, and class names to your project structure.

package com.example.orderservice.config.kafka;

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.kafka.streams.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.util.Assert;

@Configuration
@EnableKafkaStreams
@EmbeddedKafka(
    partitions = 1,
    topics = {
        // Include any topics you need to auto-create for tests:
        "payment_orders_topic",
        "stock_orders_topic",
        "orders_topic",
        "recovererDLQ"
    }
)
public class KafkaStreamsConfigWithProductionError {

    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfigWithProductionError.class);

    /**
     * Custom ProductionExceptionHandler that logs errors
     * and then continues or fails based on your preference.
     * The default implementation from Spring could also be used.
     */
    public static class MyProductionExceptionHandler extends DefaultProductionExceptionHandler {
        private static final Logger logger = LoggerFactory.getLogger(MyProductionExceptionHandler.class);

        @Override
        public ProductionExceptionHandlerResponse handle(
                ProducerRecord<byte[], byte[]> record, Exception exception) {
            // Log the serialization/production error
            logger.error("Serialization or production error: topic={}, key={}, value={}",
                    record.topic(), record.key(), record.value(), exception);

            // Decide how to proceed:
            // * CONTINUE: skip the bad record and keep the Streams application running
            // * FAIL: shut down the Streams application
            // In a real scenario, you might produce to a separate DLQ here as well
            return ProductionExceptionHandlerResponse.CONTINUE;
        }

        @Override
        public void configure(Map<String, ?> configs) {
            super.configure(configs);
        }
    }

    /**
     * Configure the StreamsBuilderFactoryBeanConfigurer to:
     * 1) Use RecoveringDeserializationExceptionHandler for deserialization issues.
     * 2) Use our custom ProductionExceptionHandler for serialization issues.
     */
    @Bean
    StreamsBuilderFactoryBeanConfigurer customStreamsConfigurer(
            ProducerFactory<byte[], byte[]> producerFactory,
            KafkaTemplate<byte[], byte[]> kafkaTemplate) {
        return factoryBean -> {
            var streamsConfig = factoryBean.getStreamsConfiguration();
            Assert.notNull(streamsConfig, "streamsConfiguration must not be null");

            // Deserialization issues
            streamsConfig.put(
                    org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    RecoveringDeserializationExceptionHandler.class);

            // Assign the custom ProductionExceptionHandler for serialization (production) errors
            streamsConfig.put(
                    org.apache.kafka.streams.StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                    MyProductionExceptionHandler.class.getName());
        };
    }
}

Explanation

  1. Custom ProductionExceptionHandler
    A MyProductionExceptionHandler class extends DefaultProductionExceptionHandler. Override the handle method to log or reroute messages as needed. You also decide whether to continue or fail the Streams application if serialization fails.

  2. Wiring in Streams Configuration

    • Add RecoveringDeserializationExceptionHandler to handle deserialization exceptions (incoming data).
    • Add MyProductionExceptionHandler (or DefaultProductionExceptionHandler) to handle serialization exceptions (outgoing data).
  3. Topic Creation for Tests
    The @EmbeddedKafka(topics = {"recovererDLQ", ...}) annotation ensures that recovererDLQ and other topics are automatically created before the test runs. This avoids the UNKNOWN_TOPIC_OR_PARTITION error for the DLQ.

With this setup:

  • Deserialization errors will trigger RecoveringDeserializationExceptionHandler, sending bad messages to "recovererDLQ", assuming the KSTREAM_DESERIALIZATION_RECOVERER property is set to your DeadLetterPublishingRecoverer.
  • Serialization (production) errors in Kafka Streams will now be caught by MyProductionExceptionHandler, allowing you to log or reroute them as desired.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java (1)

9-9: Static import wildcard caution.

Using import static com.example.orderservice.utils.AppConstants.* can import more constants than needed. While this is convenient, it can clutter the namespace. Consider importing only what you actually use or maintain clarity with direct references to AppConstants.*.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6bd482c and a42adfe.

📒 Files selected for processing (3)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java (2 hunks)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (3 hunks)
  • order-service/src/main/java/com/example/orderservice/utils/AppConstants.java (1 hunks)
🔇 Additional comments (9)
order-service/src/main/java/com/example/orderservice/utils/AppConstants.java (1)

17-17: Looks good for dedicated DLQ usage.

Adding RECOVER_DLQ_TOPIC is straightforward and aligns well with your naming convention for constants. Ensure that any references to "DLQ" across the codebase uniformly use this constant to avoid spelling inconsistencies.

order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java (2)

28-32: Improved log statement including the new DLQ topic.

Including RECOVER_DLQ_TOPIC in the log statement helps clarify all topics being created. Be sure to adjust any relevant logging/troubleshooting documentation to reflect the additional DLQ topic.


38-39: Essential addition of the DLQ topic creation.

Creating the RECOVER_DLQ_TOPIC alongside the other topics ensures the DLQ is ready for error recovery scenarios. This step significantly reduces the chance of unknown-topic errors at runtime.

order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (6)

11-11: Explicit import for the DLQ topic constant.

This import clarifies which constants are used in this file. Using the constant reference for the DLQ topic name fosters consistency.


17-17: Properties import now used for runtime stream configuration.

Great choice for leveraging Properties directly, allowing for more flexibility if advanced Kafka Streams parameters need dynamic configuration.


43-43: Assert usage enhances code safety.

Adding Assert helps catch misconfigurations early by ensuring streamsConfiguration is non-null. This defensive programming practice is beneficial for reliability.


58-59: Encapsulating the streams configuration logic.

By providing a StreamsBuilderFactoryBeanConfigurer, you're centralizing your Kafka Streams properties and transitions logic. This method is well-structured and follows Spring’s recommended pattern for configuring bean properties.


64-71: Robust error handling with RecoveringDeserializationExceptionHandler.

These lines integrate the RecoveringDeserializationExceptionHandler and wire up a DeadLetterPublishingRecoverer. This approach systematically handles deserialization errors by forwarding faulty messages to the configured DLQ. Confirm that any special-casing for serialization errors is also handled in your production exception handler if needed.


80-80: Custom partition assignment for the DLQ.

Using -1 as the partition index delegates partition selection to Kafka, which is fine for most scenarios. Just ensure you don’t need a specific partition distribution logic for DLQ. Otherwise, everything looks good.

@rajadilipkolli rajadilipkolli merged commit e23eef8 into main Dec 31, 2024
10 checks passed
@rajadilipkolli rajadilipkolli deleted the order-polish branch December 31, 2024 05:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant