Skip to content

Conversation

rajadilipkolli
Copy link
Owner

No description provided.

Copy link
Contributor

coderabbitai bot commented Apr 19, 2025

Walkthrough

Clarified KafkaConfig JavaDoc and RECOVER_DLQ_TOPIC partition/replica counts, changed logger to static in KafkaStreamsConfig, moved test property for disabling Spring Cloud Config from annotation to test properties, and added an integration test validating Kafka Streams DLQ behavior.

Changes

Cohort / File(s) Change Summary
Kafka config
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java
Added JavaDoc noting the class only creates topics (producer config in application.yml); explicitly set RECOVER_DLQ_TOPIC to 1 partition and 1 replica.
Kafka streams config
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
Made logger a static final field initialized with KafkaStreamsConfig.class instead of an instance field.
Test base / properties
order-service/src/test/java/com/example/orderservice/common/AbstractIntegrationTest.java, order-service/src/test/resources/application-test.properties
Removed spring.cloud.config.enabled=false from the SpringBootTest annotation and added spring.cloud.config.enabled=false to application-test.properties (comment clarifies cloud config/discovery are disabled for tests).
Kafka Streams integration test
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java
New integration test that sends malformed/invalid messages to payment-orders and asserts messages are routed to the recovererDLQ topic.

Sequence Diagram(s)

sequenceDiagram
    participant Test as KafkaStreamsConfigIntTest
    participant Broker as Kafka Broker
    participant DLQ as recovererDLQ

    Note over Test: Setup DLQ consumer (earliest, auto commit)
    Test->>Broker: Produce malformed JSON to payment-orders
    Test->>Broker: Produce invalid OrderDto to payment-orders
    Broker-->>DLQ: Route failed/deserialization/error records to recovererDLQ
    loop poll up to 60s
      Test->>DLQ: Poll for DLQ messages
      DLQ-->>Test: Return records (if any)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

In the warren where loggers stand tall,
Topics are set and tests heed the call.
Malformed messages tumble, then queue—
DLQ catches what broke, tried, and flew.
Rabbit hops, stamps a paw: "All clear, review!" 🐇✨


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between daff8f7 and 727d8bd.

⛔ Files ignored due to path filters (1)
  • order-service/src/main/resources/application.yml is excluded by !**/*.yml
📒 Files selected for processing (1)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: Order Service with jdk 21
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch more-test

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

github-actions bot commented Apr 19, 2025

Qodana Community for JVM

15 new problems were found

Inspection name Severity Problems
@NotNull/@Nullable problems 🔶 Warning 7
Unchecked warning 🔶 Warning 2
Method can be extracted ◽️ Notice 4
Non-distinguishable logging calls ◽️ Notice 2

☁️ View the detailed Qodana report

Contact Qodana team

Contact us at qodana-support@jetbrains.com

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (3)

35-41: Consider using a non-static consumer for better test isolation.

While this works for a single test method, using a static consumer might cause issues if you add more test methods in the future. Consider creating the consumer in a @beforeeach method and closing it in an @AfterEach method.


90-110: Replace System.out.println with proper logging.

Using System.out.println for debugging output is not ideal for production code. Consider using a logger instead.

- System.out.println("Found DLQ record: " + record.value()));
+ logger.debug("Found DLQ record: {}", record.value());
- System.out.println("No DLQ records found in this poll attempt");
+ logger.debug("No DLQ records found in this poll attempt");

You'll need to add a logger to the class:

private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsConfigIntTest.class);

With appropriate imports:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

91-110: Consider verifying the content of DLQ messages.

The test only asserts that at least one message was routed to the DLQ but doesn't verify which one or why. Consider enhancing the test to check the error details in the DLQ message headers or payload to ensure proper error handling for each specific error case.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 09a89c4 and 938a0fb.

📒 Files selected for processing (1)
  • order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: qodana
  • GitHub Check: Order Service with jdk 21
🔇 Additional comments (4)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (4)

1-5: License header format looks good.

The standard MIT license header is properly formatted.


30-34: Well-designed test class with appropriate Kafka templates.

The test class extends the base integration test class and correctly autowires two different KafkaTemplate instances - one for OrderDto objects and another for String messages, which will be useful for testing different error scenarios.


43-62: Well-configured Kafka consumer.

The consumer is properly configured with appropriate settings for the test scenario. The use of KafkaConnectionDetails for obtaining bootstrap servers is a good practice.


64-89: Good test setup with multiple error scenarios.

The test correctly sets up two different error cases: an invalid JSON message and a malformed OrderDto object. This provides good coverage of potential failures in the Kafka Streams processing pipeline.

Comment on lines 30 to 112
class KafkaStreamsConfigIntTest extends AbstractIntegrationTest {

@Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate;
@Autowired private KafkaTemplate<String, String> stringKafkaTemplate;

private static Consumer<Long, String> dlqConsumer;

@BeforeAll
static void setup(@Autowired KafkaConnectionDetails connectionDetails) {
dlqConsumer = buildTestConsumer(connectionDetails);
dlqConsumer.subscribe(Collections.singletonList("recovererDLQ"));
}

private static Consumer<Long, String> buildTestConsumer(
KafkaConnectionDetails connectionDetails) {
var props = new HashMap<String, Object>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
connectionDetails.getStreamsBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "deadletter-test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);

ConsumerFactory<Long, String> cf =
new DefaultKafkaConsumerFactory<>(
props, new LongDeserializer(), new StringDeserializer());
return cf.createConsumer("deadletter-test-consumer");
}

@Test
void deadLetterPublishingRecoverer() throws Exception {
// Clear any existing messages in the DLQ
dlqConsumer.poll(Duration.ofMillis(100));

// Method 1: Send a completely invalid JSON to the payment-orders topic
// This will definitely cause a deserialization error in the streams processing
String invalidJson = "{\"orderId\": \"THIS_SHOULD_BE_A_NUMBER\", \"badField\": true}";
stringKafkaTemplate.send("payment-orders", invalidJson);

// Method 2: Also try with a malformed OrderDto object
OrderDto orderDto = new OrderDto();
orderDto.setOrderId(-1L);
orderDto.setCustomerId(-1L);
orderDto.setSource("source");
// Set status to a very long string to cause potential issues
orderDto.setStatus(
"INVALID_STATUS_THAT_IS_VERY_LONG_AND_SHOULD_CAUSE_PROBLEMS_WITH_DESERIALIZATION");
orderDto.setItems(null);

kafkaTemplate.send("payment-orders", 1L, orderDto);

// Make sure both messages are sent
kafkaTemplate.flush();
stringKafkaTemplate.flush();

// Wait longer for the message to be routed to the DLQ with better polling
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(60)) // Increase timeout to 60 seconds
.untilAsserted(
() -> {
ConsumerRecords<Long, String> records =
dlqConsumer.poll(Duration.ofSeconds(5));
// Print out all received records for debugging
if (records.count() > 0) {
records.forEach(
record ->
System.out.println(
"Found DLQ record: " + record.value()));
} else {
System.out.println("No DLQ records found in this poll attempt");
}

assertThat(records.count())
.as("Invalid message should be routed to recovererDLQ")
.isGreaterThan(0);
});
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add resource cleanup for the Kafka consumer.

The Kafka consumer is never closed, which could lead to resource leaks. Add an @afterall method to close the consumer:

@AfterAll
static void cleanup() {
    if (dlqConsumer != null) {
        dlqConsumer.close();
    }
}

Don't forget to add the import:

import org.junit.jupiter.api.AfterAll;

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1)

35-41: ⚠️ Potential issue

Add resource cleanup for the Kafka consumer.

The Kafka consumer is never closed, which could lead to resource leaks. Add an @afterall method to close the consumer.

import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

// ...

@BeforeAll
static void setup(@Autowired KafkaConnectionDetails connectionDetails) {
    dlqConsumer = buildTestConsumer(connectionDetails);
    dlqConsumer.subscribe(Collections.singletonList("recovererDLQ"));
}

+@AfterAll
+static void cleanup() {
+    if (dlqConsumer != null) {
+        dlqConsumer.close();
+    }
+}
🧹 Nitpick comments (3)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (3)

67-68: Verify DLQ is empty after initial poll.

The current code polls the DLQ to clear existing messages but doesn't verify that it's actually empty before continuing. Consider adding an assertion to confirm the DLQ is empty before sending test messages.

// Clear any existing messages in the DLQ
-dlqConsumer.poll(Duration.ofMillis(100));
+ConsumerRecords<Long, String> initialRecords = dlqConsumer.poll(Duration.ofMillis(100));
+// Optional: Log the count of cleared messages
+if (initialRecords.count() > 0) {
+    System.out.println("Cleared " + initialRecords.count() + " existing messages from DLQ");
+}

98-105: Replace System.out.println with proper logging.

Using System.out.println for debugging is not recommended in test code. Consider using a proper logger instead.

+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

class KafkaStreamsConfigIntTest extends AbstractIntegrationTest {

+    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfigIntTest.class);
    @Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate;
    
    // ...
    
    // In the test method:
    if (records.count() > 0) {
        records.forEach(
                record ->
-                       System.out.println(
-                               "Found DLQ record: " + record.value()));
+                       log.debug("Found DLQ record: {}", record.value()));
    } else {
-       System.out.println("No DLQ records found in this poll attempt");
+       log.debug("No DLQ records found in this poll attempt");
    }

94-110: Enhance test assertions to verify message content.

The test only verifies that messages reached the DLQ but doesn't validate their content. Consider enhancing the assertions to verify the error details in the DLQ messages.

await().pollInterval(Duration.ofSeconds(1))
        .atMost(Duration.ofSeconds(60))
        .untilAsserted(
                () -> {
                    ConsumerRecords<Long, String> records =
                            dlqConsumer.poll(Duration.ofSeconds(5));
                    // Print out all received records for debugging
                    if (records.count() > 0) {
                        records.forEach(
                                record ->
                                        System.out.println(
                                                "Found DLQ record: " + record.value()));
                    } else {
                        System.out.println("No DLQ records found in this poll attempt");
                    }

                    assertThat(records.count())
                            .as("Invalid message should be routed to recovererDLQ")
                            .isGreaterThan(0);
+                   
+                   // Verify at least one record contains information about the deserialization error
+                   assertThat(records).anyMatch(record -> 
+                           record.value().contains("deserialization") || 
+                           record.value().contains("INVALID_STATUS"));
                });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 3e90977 and daff8f7.

📒 Files selected for processing (1)
  • order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: Order Service with jdk 21

Copy link

gitguardian bot commented Jun 8, 2025

⚠️ GitGuardian has uncovered 2 secrets following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secrets in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
17422989 Triggered Generic High Entropy Secret 52ff61e retail-store-webapp/src/test/resources/docker/realm-config/retailstore-realm.json View secret
8725269 Triggered Generic Password 52ff61e retail-store-webapp/src/test/java/com/example/retailstore/webapp/web/controller/RegistrationControllerTest.java View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secrets safely. Learn here the best practices.
  3. Revoke and rotate these secrets.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant