-
Notifications
You must be signed in to change notification settings - Fork 18
feat : use spring boot auto configuration #1026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughClarified KafkaConfig JavaDoc and RECOVER_DLQ_TOPIC partition/replica counts, changed logger to static in KafkaStreamsConfig, moved test property for disabling Spring Cloud Config from annotation to test properties, and added an integration test validating Kafka Streams DLQ behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Test as KafkaStreamsConfigIntTest
participant Broker as Kafka Broker
participant DLQ as recovererDLQ
Note over Test: Setup DLQ consumer (earliest, auto commit)
Test->>Broker: Produce malformed JSON to payment-orders
Test->>Broker: Produce invalid OrderDto to payment-orders
Broker-->>DLQ: Route failed/deserialization/error records to recovererDLQ
loop poll up to 60s
Test->>DLQ: Poll for DLQ messages
DLQ-->>Test: Return records (if any)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
Qodana Community for JVM15 new problems were found
☁️ View the detailed Qodana report Contact Qodana teamContact us at qodana-support@jetbrains.com
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (3)
35-41
: Consider using a non-static consumer for better test isolation.While this works for a single test method, using a static consumer might cause issues if you add more test methods in the future. Consider creating the consumer in a @beforeeach method and closing it in an @AfterEach method.
90-110
: Replace System.out.println with proper logging.Using System.out.println for debugging output is not ideal for production code. Consider using a logger instead.
- System.out.println("Found DLQ record: " + record.value())); + logger.debug("Found DLQ record: {}", record.value()); - System.out.println("No DLQ records found in this poll attempt"); + logger.debug("No DLQ records found in this poll attempt");You'll need to add a logger to the class:
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsConfigIntTest.class);With appropriate imports:
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
91-110
: Consider verifying the content of DLQ messages.The test only asserts that at least one message was routed to the DLQ but doesn't verify which one or why. Consider enhancing the test to check the error details in the DLQ message headers or payload to ensure proper error handling for each specific error case.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: Codacy Static Code Analysis
- GitHub Check: qodana
- GitHub Check: Order Service with jdk 21
🔇 Additional comments (4)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (4)
1-5
: License header format looks good.The standard MIT license header is properly formatted.
30-34
: Well-designed test class with appropriate Kafka templates.The test class extends the base integration test class and correctly autowires two different KafkaTemplate instances - one for OrderDto objects and another for String messages, which will be useful for testing different error scenarios.
43-62
: Well-configured Kafka consumer.The consumer is properly configured with appropriate settings for the test scenario. The use of KafkaConnectionDetails for obtaining bootstrap servers is a good practice.
64-89
: Good test setup with multiple error scenarios.The test correctly sets up two different error cases: an invalid JSON message and a malformed OrderDto object. This provides good coverage of potential failures in the Kafka Streams processing pipeline.
class KafkaStreamsConfigIntTest extends AbstractIntegrationTest { | ||
|
||
@Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate; | ||
@Autowired private KafkaTemplate<String, String> stringKafkaTemplate; | ||
|
||
private static Consumer<Long, String> dlqConsumer; | ||
|
||
@BeforeAll | ||
static void setup(@Autowired KafkaConnectionDetails connectionDetails) { | ||
dlqConsumer = buildTestConsumer(connectionDetails); | ||
dlqConsumer.subscribe(Collections.singletonList("recovererDLQ")); | ||
} | ||
|
||
private static Consumer<Long, String> buildTestConsumer( | ||
KafkaConnectionDetails connectionDetails) { | ||
var props = new HashMap<String, Object>(); | ||
props.put( | ||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | ||
connectionDetails.getStreamsBootstrapServers()); | ||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "deadletter-test-group"); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); | ||
props.put( | ||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); | ||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); | ||
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); | ||
|
||
ConsumerFactory<Long, String> cf = | ||
new DefaultKafkaConsumerFactory<>( | ||
props, new LongDeserializer(), new StringDeserializer()); | ||
return cf.createConsumer("deadletter-test-consumer"); | ||
} | ||
|
||
@Test | ||
void deadLetterPublishingRecoverer() throws Exception { | ||
// Clear any existing messages in the DLQ | ||
dlqConsumer.poll(Duration.ofMillis(100)); | ||
|
||
// Method 1: Send a completely invalid JSON to the payment-orders topic | ||
// This will definitely cause a deserialization error in the streams processing | ||
String invalidJson = "{\"orderId\": \"THIS_SHOULD_BE_A_NUMBER\", \"badField\": true}"; | ||
stringKafkaTemplate.send("payment-orders", invalidJson); | ||
|
||
// Method 2: Also try with a malformed OrderDto object | ||
OrderDto orderDto = new OrderDto(); | ||
orderDto.setOrderId(-1L); | ||
orderDto.setCustomerId(-1L); | ||
orderDto.setSource("source"); | ||
// Set status to a very long string to cause potential issues | ||
orderDto.setStatus( | ||
"INVALID_STATUS_THAT_IS_VERY_LONG_AND_SHOULD_CAUSE_PROBLEMS_WITH_DESERIALIZATION"); | ||
orderDto.setItems(null); | ||
|
||
kafkaTemplate.send("payment-orders", 1L, orderDto); | ||
|
||
// Make sure both messages are sent | ||
kafkaTemplate.flush(); | ||
stringKafkaTemplate.flush(); | ||
|
||
// Wait longer for the message to be routed to the DLQ with better polling | ||
await().pollInterval(Duration.ofSeconds(1)) | ||
.atMost(Duration.ofSeconds(60)) // Increase timeout to 60 seconds | ||
.untilAsserted( | ||
() -> { | ||
ConsumerRecords<Long, String> records = | ||
dlqConsumer.poll(Duration.ofSeconds(5)); | ||
// Print out all received records for debugging | ||
if (records.count() > 0) { | ||
records.forEach( | ||
record -> | ||
System.out.println( | ||
"Found DLQ record: " + record.value())); | ||
} else { | ||
System.out.println("No DLQ records found in this poll attempt"); | ||
} | ||
|
||
assertThat(records.count()) | ||
.as("Invalid message should be routed to recovererDLQ") | ||
.isGreaterThan(0); | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add resource cleanup for the Kafka consumer.
The Kafka consumer is never closed, which could lead to resource leaks. Add an @afterall method to close the consumer:
@AfterAll
static void cleanup() {
if (dlqConsumer != null) {
dlqConsumer.close();
}
}
Don't forget to add the import:
import org.junit.jupiter.api.AfterAll;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1)
35-41
:⚠️ Potential issueAdd resource cleanup for the Kafka consumer.
The Kafka consumer is never closed, which could lead to resource leaks. Add an @afterall method to close the consumer.
import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; // ... @BeforeAll static void setup(@Autowired KafkaConnectionDetails connectionDetails) { dlqConsumer = buildTestConsumer(connectionDetails); dlqConsumer.subscribe(Collections.singletonList("recovererDLQ")); } +@AfterAll +static void cleanup() { + if (dlqConsumer != null) { + dlqConsumer.close(); + } +}
🧹 Nitpick comments (3)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (3)
67-68
: Verify DLQ is empty after initial poll.The current code polls the DLQ to clear existing messages but doesn't verify that it's actually empty before continuing. Consider adding an assertion to confirm the DLQ is empty before sending test messages.
// Clear any existing messages in the DLQ -dlqConsumer.poll(Duration.ofMillis(100)); +ConsumerRecords<Long, String> initialRecords = dlqConsumer.poll(Duration.ofMillis(100)); +// Optional: Log the count of cleared messages +if (initialRecords.count() > 0) { + System.out.println("Cleared " + initialRecords.count() + " existing messages from DLQ"); +}
98-105
: Replace System.out.println with proper logging.Using System.out.println for debugging is not recommended in test code. Consider using a proper logger instead.
+import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class KafkaStreamsConfigIntTest extends AbstractIntegrationTest { + private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfigIntTest.class); @Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate; // ... // In the test method: if (records.count() > 0) { records.forEach( record -> - System.out.println( - "Found DLQ record: " + record.value())); + log.debug("Found DLQ record: {}", record.value())); } else { - System.out.println("No DLQ records found in this poll attempt"); + log.debug("No DLQ records found in this poll attempt"); }
94-110
: Enhance test assertions to verify message content.The test only verifies that messages reached the DLQ but doesn't validate their content. Consider enhancing the assertions to verify the error details in the DLQ messages.
await().pollInterval(Duration.ofSeconds(1)) .atMost(Duration.ofSeconds(60)) .untilAsserted( () -> { ConsumerRecords<Long, String> records = dlqConsumer.poll(Duration.ofSeconds(5)); // Print out all received records for debugging if (records.count() > 0) { records.forEach( record -> System.out.println( "Found DLQ record: " + record.value())); } else { System.out.println("No DLQ records found in this poll attempt"); } assertThat(records.count()) .as("Invalid message should be routed to recovererDLQ") .isGreaterThan(0); + + // Verify at least one record contains information about the deserialization error + assertThat(records).anyMatch(record -> + record.value().contains("deserialization") || + record.value().contains("INVALID_STATUS")); });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Codacy Static Code Analysis
- GitHub Check: Order Service with jdk 21
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
17422989 | Triggered | Generic High Entropy Secret | 52ff61e | retail-store-webapp/src/test/resources/docker/realm-config/retailstore-realm.json | View secret |
8725269 | Triggered | Generic Password | 52ff61e | retail-store-webapp/src/test/java/com/example/retailstore/webapp/web/controller/RegistrationControllerTest.java | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
No description provided.