Skip to content

Conversation

@rajadilipkolli
Copy link
Owner

No description provided.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Apr 19, 2025

Walkthrough

This pull request updates Kafka configuration and adds Kafka Streams dead-letter queue (DLQ) testing. Changes include explicit partition/replica specification for a DLQ topic in KafkaConfig, logger refactoring in KafkaStreamsConfig, test configuration updates to disable cloud services, and a new integration test verifying error message handling through the DLQ.

Changes

Cohort / File(s) Summary
Kafka Configuration
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java, order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
Added JavaDoc clarifying topic creation focus in KafkaConfig; explicitly specified 1 partition and 1 replica for RECOVER_DLQ_TOPIC. Refactored logger field in KafkaStreamsConfig from instance-level to static final initialization.
Test Configuration
order-service/src/test/resources/application-test.properties
Added property spring.cloud.config.enabled=false alongside existing cloud discovery disable setting, with explanatory comment.
Integration Test
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java
New integration test class verifying DLQ functionality by sending invalid messages to payment-orders topic and asserting they appear in recovererDLQ topic within 60 seconds.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

  • Verify KafkaTemplate bean configuration for Long key and OrderDto value types
  • Confirm static Kafka consumer initialization and proper cleanup between test runs
  • Validate polling logic handles the 60-second timeout and 1-second interval correctly
  • Check that test data (malformed JSON and invalid OrderDto with negative IDs/oversized status) correctly triggers deserialization failures

Possibly related PRs

Poem

🐰 Hops through Kafka's winding streams,
Dead letters now in DLQ dreams,
With explicit partitions set just right,
Error recovery shines so bright!
Tests confirm the flow so true,
Invalid messages know what to do! 🎯

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Title check ⚠️ Warning The title 'feat : use spring boot auto configuration' is vague and does not match the actual changes made, which involve Kafka configuration adjustments, logger changes, and adding integration tests. Revise the title to accurately reflect the main changes, such as 'refactor: improve Kafka configuration and add DLQ integration tests' or similar.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description check ❓ Inconclusive No pull request description was provided by the author, making it impossible to assess relevance to the changeset. Add a detailed pull request description explaining the purpose and rationale for the Kafka configuration changes, logger refactoring, and new integration tests.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch more-test

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 727d8bd and 170a8ef.

📒 Files selected for processing (3)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java (2 hunks)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (1 hunks)
  • order-service/src/test/resources/application-test.properties (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaConfig.java
  • order-service/src/test/resources/application-test.properties
🧰 Additional context used
📓 Path-based instructions (2)
**/*.java

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

**/*.java: Use imports rather than fully qualified class names in code
Run spotless:apply after any formatting rule changes to maintain consistent code style

Files:

  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
**/src/main/java/**/*.java

📄 CodeRabbit inference engine (.github/copilot-instructions.md)

Search for @KafkaListener, @KafkaTemplate, and topic constants in service source code to verify event contracts and Kafka message producers/consumers

Files:

  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
🧠 Learnings (1)
📚 Learning: 2025-12-01T03:54:08.848Z
Learnt from: CR
Repo: rajadilipkolli/spring-boot-microservices-series-v2 PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-12-01T03:54:08.848Z
Learning: Applies to **/src/main/java/**/*.java : Search for `KafkaListener`, `KafkaTemplate`, and `topic` constants in service source code to verify event contracts and Kafka message producers/consumers

Applied to files:

  • order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (1)
order-service/src/main/java/com/example/orderservice/config/kafka/KafkaStreamsConfig.java (1)

52-52: Logger refactor to static final is appropriate

Using a static final logger bound to KafkaStreamsConfig.class is idiomatic and avoids per-instance logger creation; the change is clean and non-breaking.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Apr 19, 2025

Qodana Community for JVM

15 new problems were found

Inspection name Severity Problems
@NotNull/@Nullable problems 🔶 Warning 7
Unchecked warning 🔶 Warning 2
Method can be extracted ◽️ Notice 4
Non-distinguishable logging calls ◽️ Notice 2

☁️ View the detailed Qodana report

Contact Qodana team

Contact us at qodana-support@jetbrains.com

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (3)

35-41: Consider using a non-static consumer for better test isolation.

While this works for a single test method, using a static consumer might cause issues if you add more test methods in the future. Consider creating the consumer in a @beforeeach method and closing it in an @AfterEach method.


90-110: Replace System.out.println with proper logging.

Using System.out.println for debugging output is not ideal for production code. Consider using a logger instead.

- System.out.println("Found DLQ record: " + record.value()));
+ logger.debug("Found DLQ record: {}", record.value());
- System.out.println("No DLQ records found in this poll attempt");
+ logger.debug("No DLQ records found in this poll attempt");

You'll need to add a logger to the class:

private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsConfigIntTest.class);

With appropriate imports:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

91-110: Consider verifying the content of DLQ messages.

The test only asserts that at least one message was routed to the DLQ but doesn't verify which one or why. Consider enhancing the test to check the error details in the DLQ message headers or payload to ensure proper error handling for each specific error case.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 09a89c4 and 938a0fb.

📒 Files selected for processing (1)
  • order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: qodana
  • GitHub Check: Order Service with jdk 21
🔇 Additional comments (4)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (4)

1-5: License header format looks good.

The standard MIT license header is properly formatted.


30-34: Well-designed test class with appropriate Kafka templates.

The test class extends the base integration test class and correctly autowires two different KafkaTemplate instances - one for OrderDto objects and another for String messages, which will be useful for testing different error scenarios.


43-62: Well-configured Kafka consumer.

The consumer is properly configured with appropriate settings for the test scenario. The use of KafkaConnectionDetails for obtaining bootstrap servers is a good practice.


64-89: Good test setup with multiple error scenarios.

The test correctly sets up two different error cases: an invalid JSON message and a malformed OrderDto object. This provides good coverage of potential failures in the Kafka Streams processing pipeline.

Comment on lines 30 to 112
class KafkaStreamsConfigIntTest extends AbstractIntegrationTest {

@Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate;
@Autowired private KafkaTemplate<String, String> stringKafkaTemplate;

private static Consumer<Long, String> dlqConsumer;

@BeforeAll
static void setup(@Autowired KafkaConnectionDetails connectionDetails) {
dlqConsumer = buildTestConsumer(connectionDetails);
dlqConsumer.subscribe(Collections.singletonList("recovererDLQ"));
}

private static Consumer<Long, String> buildTestConsumer(
KafkaConnectionDetails connectionDetails) {
var props = new HashMap<String, Object>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
connectionDetails.getStreamsBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "deadletter-test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);

ConsumerFactory<Long, String> cf =
new DefaultKafkaConsumerFactory<>(
props, new LongDeserializer(), new StringDeserializer());
return cf.createConsumer("deadletter-test-consumer");
}

@Test
void deadLetterPublishingRecoverer() throws Exception {
// Clear any existing messages in the DLQ
dlqConsumer.poll(Duration.ofMillis(100));

// Method 1: Send a completely invalid JSON to the payment-orders topic
// This will definitely cause a deserialization error in the streams processing
String invalidJson = "{\"orderId\": \"THIS_SHOULD_BE_A_NUMBER\", \"badField\": true}";
stringKafkaTemplate.send("payment-orders", invalidJson);

// Method 2: Also try with a malformed OrderDto object
OrderDto orderDto = new OrderDto();
orderDto.setOrderId(-1L);
orderDto.setCustomerId(-1L);
orderDto.setSource("source");
// Set status to a very long string to cause potential issues
orderDto.setStatus(
"INVALID_STATUS_THAT_IS_VERY_LONG_AND_SHOULD_CAUSE_PROBLEMS_WITH_DESERIALIZATION");
orderDto.setItems(null);

kafkaTemplate.send("payment-orders", 1L, orderDto);

// Make sure both messages are sent
kafkaTemplate.flush();
stringKafkaTemplate.flush();

// Wait longer for the message to be routed to the DLQ with better polling
await().pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(60)) // Increase timeout to 60 seconds
.untilAsserted(
() -> {
ConsumerRecords<Long, String> records =
dlqConsumer.poll(Duration.ofSeconds(5));
// Print out all received records for debugging
if (records.count() > 0) {
records.forEach(
record ->
System.out.println(
"Found DLQ record: " + record.value()));
} else {
System.out.println("No DLQ records found in this poll attempt");
}

assertThat(records.count())
.as("Invalid message should be routed to recovererDLQ")
.isGreaterThan(0);
});
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add resource cleanup for the Kafka consumer.

The Kafka consumer is never closed, which could lead to resource leaks. Add an @afterall method to close the consumer:

@AfterAll
static void cleanup() {
    if (dlqConsumer != null) {
        dlqConsumer.close();
    }
}

Don't forget to add the import:

import org.junit.jupiter.api.AfterAll;

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1)

35-41: ⚠️ Potential issue

Add resource cleanup for the Kafka consumer.

The Kafka consumer is never closed, which could lead to resource leaks. Add an @afterall method to close the consumer.

import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

// ...

@BeforeAll
static void setup(@Autowired KafkaConnectionDetails connectionDetails) {
    dlqConsumer = buildTestConsumer(connectionDetails);
    dlqConsumer.subscribe(Collections.singletonList("recovererDLQ"));
}

+@AfterAll
+static void cleanup() {
+    if (dlqConsumer != null) {
+        dlqConsumer.close();
+    }
+}
🧹 Nitpick comments (3)
order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (3)

67-68: Verify DLQ is empty after initial poll.

The current code polls the DLQ to clear existing messages but doesn't verify that it's actually empty before continuing. Consider adding an assertion to confirm the DLQ is empty before sending test messages.

// Clear any existing messages in the DLQ
-dlqConsumer.poll(Duration.ofMillis(100));
+ConsumerRecords<Long, String> initialRecords = dlqConsumer.poll(Duration.ofMillis(100));
+// Optional: Log the count of cleared messages
+if (initialRecords.count() > 0) {
+    System.out.println("Cleared " + initialRecords.count() + " existing messages from DLQ");
+}

98-105: Replace System.out.println with proper logging.

Using System.out.println for debugging is not recommended in test code. Consider using a proper logger instead.

+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

class KafkaStreamsConfigIntTest extends AbstractIntegrationTest {

+    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfigIntTest.class);
    @Autowired private KafkaTemplate<Long, OrderDto> kafkaTemplate;
    
    // ...
    
    // In the test method:
    if (records.count() > 0) {
        records.forEach(
                record ->
-                       System.out.println(
-                               "Found DLQ record: " + record.value()));
+                       log.debug("Found DLQ record: {}", record.value()));
    } else {
-       System.out.println("No DLQ records found in this poll attempt");
+       log.debug("No DLQ records found in this poll attempt");
    }

94-110: Enhance test assertions to verify message content.

The test only verifies that messages reached the DLQ but doesn't validate their content. Consider enhancing the assertions to verify the error details in the DLQ messages.

await().pollInterval(Duration.ofSeconds(1))
        .atMost(Duration.ofSeconds(60))
        .untilAsserted(
                () -> {
                    ConsumerRecords<Long, String> records =
                            dlqConsumer.poll(Duration.ofSeconds(5));
                    // Print out all received records for debugging
                    if (records.count() > 0) {
                        records.forEach(
                                record ->
                                        System.out.println(
                                                "Found DLQ record: " + record.value()));
                    } else {
                        System.out.println("No DLQ records found in this poll attempt");
                    }

                    assertThat(records.count())
                            .as("Invalid message should be routed to recovererDLQ")
                            .isGreaterThan(0);
+                   
+                   // Verify at least one record contains information about the deserialization error
+                   assertThat(records).anyMatch(record -> 
+                           record.value().contains("deserialization") || 
+                           record.value().contains("INVALID_STATUS"));
                });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between 3e90977 and daff8f7.

📒 Files selected for processing (1)
  • order-service/src/test/java/com/example/orderservice/config/kafka/KafkaStreamsConfigIntTest.java (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: Order Service with jdk 21

@gitguardian
Copy link

gitguardian bot commented Jun 8, 2025

⚠️ GitGuardian has uncovered 2 secrets following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secrets in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
17422989 Triggered Generic High Entropy Secret 52ff61e retail-store-webapp/src/test/resources/docker/realm-config/retailstore-realm.json View secret
8725269 Triggered Generic Password 52ff61e retail-store-webapp/src/test/java/com/example/retailstore/webapp/web/controller/RegistrationControllerTest.java View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secrets safely. Learn here the best practices.
  3. Revoke and rotate these secrets.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants