diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 64e8646d6f153..dfc7052f22e64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -337,7 +337,8 @@ void runOnce() { long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); - client.poll(pollTimeout, currentTimeMs); + if (!forceClose) + client.poll(pollTimeout, currentTimeMs); } // We handle {@code TransactionalIdAuthorizationException} and {@code ClusterAuthorizationException} by first diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index cd984ac2a343e..234ddb32018e5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -141,6 +141,8 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -3588,6 +3590,28 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader } } + @Test + public void testBatchesSentAfterForceClose() throws InterruptedException { + MockClient spy = spy(client); + Metrics m = new Metrics(); + SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); + sender = new Sender(logContext, spy, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null); + + FutureRecordMetadata future = appendToAccumulator(tp0, 0L, "key", "value"); + sender.forceClose(); + assertTrue(sender.inFlightBatches(tp0).isEmpty()); + reset(spy); + sender.runOnce(); + // Either the request is not sent, or the send request is not processed by poll + long sendInvocations = mockingDetails(spy).getInvocations().stream().filter(invocation -> invocation.getMethod().getName().equals("send")).count(); + long pollInvocations = mockingDetails(spy).getInvocations().stream().filter(invocation -> invocation.getMethod().getName().equals("poll")).count(); + assertTrue(sendInvocations == 0 || pollInvocations == 0, "Message cannot be sent or processed after Sender is forced close"); + + sender.run(); + TestUtils.assertFutureThrows(KafkaException.class, future); + } + private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception { Future future = appendToAccumulator(tp0, 0L, "key", "value");