From 3452d003a249ea45d69ee95bb4849b01e9dc2c42 Mon Sep 17 00:00:00 2001 From: ChangYu Huang Date: Tue, 28 Oct 2025 19:26:36 -0400 Subject: [PATCH 1/3] Catch race condition --- .../kafka/clients/producer/internals/SenderTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index cd984ac2a343e..b12022e4b1394 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -3588,6 +3588,17 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader } } + @Test + public void testBatchesSentAfterForceClose() throws InterruptedException { + FutureRecordMetadata future = appendToAccumulator(tp0, 0L, "key", "value"); + sender.forceClose(); + assertTrue(sender.inFlightBatches(tp0).isEmpty()); + sender.runOnce(); + assertTrue(sender.inFlightBatches(tp0).isEmpty()); + sender.run(); + TestUtils.assertFutureThrows(KafkaException.class, future); + } + private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception { Future future = appendToAccumulator(tp0, 0L, "key", "value"); From ee77f8abf8e2121899383ff7f84554805adb8473 Mon Sep 17 00:00:00 2001 From: ChangYu Huang Date: Wed, 29 Oct 2025 18:21:04 -0400 Subject: [PATCH 2/3] Accept different ways to force close in test --- .../clients/producer/internals/SenderTest.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index b12022e4b1394..234ddb32018e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -141,6 +141,8 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -3590,11 +3592,22 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader @Test public void testBatchesSentAfterForceClose() throws InterruptedException { + MockClient spy = spy(client); + Metrics m = new Metrics(); + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + sender = new Sender(logContext, spy, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null); + FutureRecordMetadata future = appendToAccumulator(tp0, 0L, "key", "value"); sender.forceClose(); assertTrue(sender.inFlightBatches(tp0).isEmpty()); + reset(spy); sender.runOnce(); - assertTrue(sender.inFlightBatches(tp0).isEmpty()); + // Either the request is not sent, or the send request is not processed by poll + long sendInvocations = mockingDetails(spy).getInvocations().stream().filter(invocation -> invocation.getMethod().getName().equals("send")).count(); + long pollInvocations = mockingDetails(spy).getInvocations().stream().filter(invocation -> invocation.getMethod().getName().equals("poll")).count(); + assertTrue(sendInvocations == 0 || pollInvocations == 0, "Message cannot be sent or processed after Sender is forced close"); + sender.run(); TestUtils.assertFutureThrows(KafkaException.class, future); } From 1c70834d0e368a55e15f856bc15d8da06f8c1458 Mon Sep 17 00:00:00 2001 From: ChangYu Huang Date: Wed, 29 Oct 2025 19:00:11 -0400 Subject: [PATCH 3/3] Fix --- .../org/apache/kafka/clients/producer/internals/Sender.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 64e8646d6f153..dfc7052f22e64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -337,7 +337,8 @@ void runOnce() { long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); - client.poll(pollTimeout, currentTimeMs); + if (!forceClose) + client.poll(pollTimeout, currentTimeMs); } // We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first