From 93c55de884673ec0a4995413f3529acb578411df Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 29 Oct 2025 17:07:32 -0700 Subject: [PATCH 1/8] KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close() WIP --- .../SaslPlainPlaintextConsumerTest.java | 36 +++++++++++++++++ .../internals/ConsumerNetworkThread.java | 40 +++++++++++++++++-- .../events/ApplicationEventHandler.java | 8 ++++ 3 files changed, 81 insertions(+), 3 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java index dcd9d3a27b847..3b91e06089c3c 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; @@ -46,6 +47,7 @@ import static org.apache.kafka.common.test.JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertThrows; @ClusterTestDefaults( types = {Type.KRAFT}, @@ -153,4 +155,38 @@ public void testAsyncConsumeCoordinatorFailover() throws InterruptedException { ); testCoordinatorFailover(cluster, config); } + + @ClusterTest( + brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT + ) + public void testClassicConsumeInvalidJaas() { + assertThrows(KafkaException.class, () -> + testSimpleConsumption( + cluster, + Map.of( + SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name, + SASL_MECHANISM, MECHANISMS, + SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;", + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + ) + ) + ); + } + + @ClusterTest( + brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT + ) + public void testAsyncConsumeInvalidJaas() { + assertThrows(KafkaException.class, () -> + testSimpleConsumption( + cluster, + Map.of( + SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name, + SASL_MECHANISM, MECHANISMS, + SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;", + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + ) + ) + ); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d2d178a88c38b..4ded24206be00 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -41,6 +42,8 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; @@ -68,6 +71,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private RequestManagers requestManagers; private volatile boolean running; private final IdempotentCloser closer = new IdempotentCloser(); + private final CountDownLatch initializationLatch = new CountDownLatch(1); + private final AtomicReference initializationError = new AtomicReference<>(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS; private long lastPollTimeMs = 0L; @@ -92,13 +97,42 @@ public ConsumerNetworkThread(LogContext logContext, this.asyncConsumerMetrics = asyncConsumerMetrics; } + public void awaitInitialization() { + try { + // It's a bit of a code smell to have an unbounded wait + initializationLatch.await(); + } catch (InterruptedException e) { + initializationError.compareAndSet( + null, + new KafkaException("An interruption occurred during consumer network thread initialization", e) + ); + log.error("Interrupted while waiting to initialize resources for consumer network thread", e); + } + + KafkaException e = initializationError.get(); + + if (e != null) + throw e; + } + @Override public void run() { try { log.debug("Consumer network thread started"); // Wait until we're securely in the background network thread to initialize these objects... - initializeResources(); + try { + initializeResources(); + } catch (Throwable t) { + initializationError.compareAndSet( + null, + new KafkaException("An error occurred during consumer network thread initialization", t) + ); + log.error("Failed to initialize resources for consumer network thread", t); + return; + } finally { + initializationLatch.countDown(); + } while (running) { try { @@ -108,8 +142,8 @@ public void run() { log.error("Unexpected error caught in consumer network thread", e); } } - } catch (final Throwable e) { - log.error("Failed to initialize resources for consumer network thread", e); + } catch (Throwable t) { + log.error("Unexpected failure in consumer network thread", t); } finally { cleanup(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index 6ab827b617c19..f5b56e96bd4db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -69,7 +69,15 @@ public ApplicationEventHandler(final LogContext logContext, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics); + + // Start the network thread and let it complete its initialization before proceeding. + // + // Certain error cases (e.g. an invalid javax.security.auth.spi.LoginModule in sasl.jaas.config) will + // cause errors during ConsumerNetworkThread.initializeResources(), causing the ConsumerNetworkThread.run() + // method to exit. At that point, any enqueued events are never processed, which means that the consumer + // effectively hangs. this.networkThread.start(); + this.networkThread.awaitInitialization(); } /** From 8bb60c03ee05ec952b38be4e45822c10dc7bc9ad Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 29 Oct 2025 17:18:55 -0700 Subject: [PATCH 2/8] Revised comments --- .../internals/events/ApplicationEventHandler.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index f5b56e96bd4db..e8496809f8a15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -70,11 +70,12 @@ public ApplicationEventHandler(final LogContext logContext, requestManagersSupplier, asyncConsumerMetrics); - // Start the network thread and let it complete its initialization before proceeding. + // Start the network thread and let it complete its initialization before proceeding. The ClassicKafkaConsumer + // constructor blocks during creation of its NetworkClient, thus providing a precedent for waiting here. // - // Certain error cases (e.g. an invalid javax.security.auth.spi.LoginModule in sasl.jaas.config) will - // cause errors during ConsumerNetworkThread.initializeResources(), causing the ConsumerNetworkThread.run() - // method to exit. At that point, any enqueued events are never processed, which means that the consumer + // In certain cases (e.g. an invalid javax.security.auth.spi.LoginModule in sasl.jaas.config), an error + // could be thrown during ConsumerNetworkThread.initializeResources(). This would result in the + // ConsumerNetworkThread.run() method exiting, no longer able to process events, which means that the consumer // effectively hangs. this.networkThread.start(); this.networkThread.awaitInitialization(); From 23cd9a3d27b355ab4b1801afb19769bb50b75d0f Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 29 Oct 2025 18:03:36 -0700 Subject: [PATCH 3/8] Added timeout and more tests --- .../internals/AsyncKafkaConsumer.java | 3 + .../internals/ConsumerNetworkThread.java | 31 ++++++---- .../consumer/internals/ShareConsumerImpl.java | 3 + .../events/ApplicationEventHandler.java | 3 +- .../ApplicationEventHandlerTest.java | 57 +++++++++++++++++++ .../internals/AsyncKafkaConsumerTest.java | 4 +- .../internals/ShareConsumerImplTest.java | 2 +- 7 files changed, 88 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 3915ff7c8df88..5bc47f02b911b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -468,6 +468,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -660,6 +661,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config, ); this.applicationEventHandler = new ApplicationEventHandler(logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -677,6 +679,7 @@ interface ApplicationEventHandlerFactory { ApplicationEventHandler build( final LogContext logContext, final Time time, + final int initializationTimeoutMs, final BlockingQueue applicationEventQueue, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 4ded24206be00..3b6c5153e9c8b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -43,6 +45,7 @@ import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -97,16 +100,17 @@ public ConsumerNetworkThread(LogContext logContext, this.asyncConsumerMetrics = asyncConsumerMetrics; } - public void awaitInitialization() { + public void awaitInitialization(int timeoutMs) { try { - // It's a bit of a code smell to have an unbounded wait - initializationLatch.await(); + if (!initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) { + maybeSetInitializationError( + new TimeoutException("Consumer network thread resource initialization timed out after " + timeoutMs + " ms") + ); + } } catch (InterruptedException e) { - initializationError.compareAndSet( - null, - new KafkaException("An interruption occurred during consumer network thread initialization", e) + maybeSetInitializationError( + new InterruptException("Consumer network thread resource initialization was interrupted", e) ); - log.error("Interrupted while waiting to initialize resources for consumer network thread", e); } KafkaException e = initializationError.get(); @@ -124,11 +128,9 @@ public void run() { try { initializeResources(); } catch (Throwable t) { - initializationError.compareAndSet( - null, - new KafkaException("An error occurred during consumer network thread initialization", t) + maybeSetInitializationError( + new KafkaException("Consumer network thread resource initialization failed", t) ); - log.error("Failed to initialize resources for consumer network thread", t); return; } finally { initializationLatch.countDown(); @@ -149,6 +151,13 @@ public void run() { } } + private void maybeSetInitializationError(KafkaException error) { + if (initializationError.compareAndSet(null, error)) + return; + + log.error("Consumer network thread resource initialization error ({}) will be suppressed as an error was already set", error.getMessage(), error); + } + void initializeResources() { applicationEventProcessor = applicationEventProcessorSupplier.get(); networkClientDelegate = networkClientDelegateSupplier.get(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 4a7e19a6e5694..91f06fcb47c38 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -307,6 +307,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -413,6 +414,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) { this.applicationEventHandler = new ApplicationEventHandler( logContext, time, + config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, @@ -478,6 +480,7 @@ interface ApplicationEventHandlerFactory { ApplicationEventHandler build( final LogContext logContext, final Time time, + final int initializationTimeoutMs, final BlockingQueue applicationEventQueue, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index e8496809f8a15..606b57c73ed3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -51,6 +51,7 @@ public class ApplicationEventHandler implements Closeable { public ApplicationEventHandler(final LogContext logContext, final Time time, + final int initializationTimeoutMs, final BlockingQueue applicationEventQueue, final CompletableEventReaper applicationEventReaper, final Supplier applicationEventProcessorSupplier, @@ -78,7 +79,7 @@ public ApplicationEventHandler(final LogContext logContext, // ConsumerNetworkThread.run() method exiting, no longer able to process events, which means that the consumer // effectively hangs. this.networkThread.start(); - this.networkThread.awaitInitialization(); + this.networkThread.awaitInitialization(initializationTimeoutMs); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index 402697227ee80..d506d706a1ac9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -22,23 +22,31 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Supplier; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; public class ApplicationEventHandlerTest { private final Time time = new MockTime(); + private final int initializationTimeoutMs = 50; private final BlockingQueue applicationEventsQueue = new LinkedBlockingQueue<>(); private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); @@ -53,6 +61,7 @@ public void testRecordApplicationEventQueueSize(String groupName) { ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( new LogContext(), time, + 10000, applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, @@ -65,4 +74,52 @@ public void testRecordApplicationEventQueueSize(String groupName) { verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1); } } + + @Test + public void testFailOnInitializeResources() { + RuntimeException rootFailure = new RuntimeException("root failure"); + KafkaException error = assertInitializeResourcesError( + KafkaException.class, + () -> { + throw rootFailure; + } + ); + assertEquals(rootFailure, error.getCause()); + } + + @Test + public void testDelayInInitializeResources() { + assertInitializeResourcesError( + TimeoutException.class, + () -> { + long delayMs = initializationTimeoutMs * 2; + org.apache.kafka.common.utils.Utils.sleep(delayMs); + return networkClientDelegate; + } + ); + } + + @Test + public void testInterruptInInitializeResources() { + Thread.currentThread().interrupt(); + assertInitializeResourcesError(InterruptException.class, () -> networkClientDelegate); + } + + private T assertInitializeResourcesError(Class exceptionClass, + Supplier networkClientDelegateSupplier) { + try (Metrics metrics = new Metrics(); + AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics, "test-group"))) { + return assertThrows(exceptionClass, () -> new ApplicationEventHandler( + new LogContext(), + time, + initializationTimeoutMs, + applicationEventsQueue, + applicationEventReaper, + () -> applicationEventProcessor, + networkClientDelegateSupplier, + () -> requestManagers, + asyncConsumerMetrics + )); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index bccd4ebeb8149..21747e7afedb2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -223,7 +223,7 @@ private AsyncKafkaConsumer newConsumerWithStreamRebalanceData( new StringDeserializer(), new StringDeserializer(), time, - (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, + (logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, logContext -> backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata, @@ -238,7 +238,7 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, + (logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler, logContext -> backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index edb244b06aa26..8eae3cd0dd84a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -141,7 +141,7 @@ private ShareConsumerImpl newConsumer(ConsumerConfig config) { new StringDeserializer(), new StringDeserializer(), time, - (a, b, c, d, e, f, g, h) -> applicationEventHandler, + (a, b, c, d, e, f, g, h, i) -> applicationEventHandler, a -> backgroundEventReaper, (a, b, c, d, e) -> fetchCollector, backgroundEventQueue From 44ba7c801b0fcac4f9b5ab3e0000d8b853c85735 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 29 Oct 2025 18:04:00 -0700 Subject: [PATCH 4/8] Update ApplicationEventHandlerTest.java --- .../clients/consumer/internals/ApplicationEventHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index d506d706a1ac9..534ba2e3bc139 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -61,7 +61,7 @@ public void testRecordApplicationEventQueueSize(String groupName) { ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( new LogContext(), time, - 10000, + initializationTimeoutMs, applicationEventsQueue, applicationEventReaper, () -> applicationEventProcessor, From 8276a250da62649cb65e59566b707bd589aee371 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 29 Oct 2025 18:06:18 -0700 Subject: [PATCH 5/8] Update ConsumerNetworkThread.java --- .../kafka/clients/consumer/internals/ConsumerNetworkThread.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 3b6c5153e9c8b..08f0745d7d035 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -131,6 +131,7 @@ public void run() { maybeSetInitializationError( new KafkaException("Consumer network thread resource initialization failed", t) ); + cleanup(); return; } finally { initializationLatch.countDown(); From d2b01f976e6a88d7d736975bd59d382302bd5696 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 31 Oct 2025 14:47:51 -0700 Subject: [PATCH 6/8] Moved test case to unit test --- .../SaslPlainPlaintextConsumerTest.java | 36 ------------ .../clients/consumer/KafkaConsumerTest.java | 56 +++++++++++++++++++ 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java index 3b91e06089c3c..dcd9d3a27b847 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/SaslPlainPlaintextConsumerTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ClientsTestUtils; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; @@ -47,7 +46,6 @@ import static org.apache.kafka.common.test.JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; -import static org.junit.jupiter.api.Assertions.assertThrows; @ClusterTestDefaults( types = {Type.KRAFT}, @@ -155,38 +153,4 @@ public void testAsyncConsumeCoordinatorFailover() throws InterruptedException { ); testCoordinatorFailover(cluster, config); } - - @ClusterTest( - brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT - ) - public void testClassicConsumeInvalidJaas() { - assertThrows(KafkaException.class, () -> - testSimpleConsumption( - cluster, - Map.of( - SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name, - SASL_MECHANISM, MECHANISMS, - SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;", - GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) - ) - ) - ); - } - - @ClusterTest( - brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT - ) - public void testAsyncConsumeInvalidJaas() { - assertThrows(KafkaException.class, () -> - testSimpleConsumption( - cluster, - Map.of( - SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name, - SASL_MECHANISM, MECHANISMS, - SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;", - GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) - ) - ) - ); - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f8e..9e8e0b88a017c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InterruptException; @@ -93,6 +94,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -131,6 +133,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -152,6 +155,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; +import javax.security.auth.login.LoginException; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON; @@ -3796,6 +3800,58 @@ void testMonitorablePlugins(GroupProtocol groupProtocol) { } } + /** + * This test ensures that both {@link Consumer} implementations fail on creation when the underlying + * {@link NetworkClient} fails creation. + * + * The logic to check for this case is admittedly a bit awkward because the constructor can fail for all + * manner of reasons. So a failure case is created by specifying an invalid + * {@link javax.security.auth.spi.LoginModule} class name, which in turn causes the {@link NetworkClient} + * to fail. + * + * This test was created to validate the change for KAFKA-19394 for the {@link AsyncKafkaConsumer}. The fix + * should handle the case where failure during initialization of resources (in this test, the underlying + * {@link NetworkClient}) will not cause the creation of the {@link AsyncKafkaConsumer} to hang. + */ + @ParameterizedTest + @EnumSource(value = GroupProtocol.class) + public void testConstructorFailOnNetworkClientConstructorFailure(GroupProtocol groupProtocol) { + Map configs = Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999", + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name, + SaslConfigs.SASL_MECHANISM, "PLAIN", + SaslConfigs.SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT) + ); + + KafkaException e = assertThrows(KafkaException.class, () -> { + try (KafkaConsumer ignored = new KafkaConsumer<>(configs)) { + fail("Should not be able to create the consumer"); + } + }); + + assertEquals("Failed to construct kafka consumer", e.getMessage()); + + // The root cause is multiple exceptions deep. This code is more concise and should hopefully be trivial + // to update should the underlying implementation change. + Throwable cause = e.getCause(); + assertNotNull(cause); + assertInstanceOf(KafkaException.class, cause); + assertEquals("Failed to create new NetworkClient", cause.getMessage()); + + cause = cause.getCause(); + assertNotNull(cause); + assertInstanceOf(KafkaException.class, cause); + assertEquals(LoginException.class.getName() + ": No LoginModule found for org.example.InvalidLoginModule", cause.getMessage()); + + cause = cause.getCause(); + assertNotNull(cause); + assertInstanceOf(LoginException.class, cause); + assertEquals("No LoginModule found for org.example.InvalidLoginModule", cause.getMessage()); + } + private MetricName expectedMetricName(String clientId, String config, Class clazz) { Map expectedTags = new LinkedHashMap<>(); expectedTags.put("client-id", clientId); From f8bbc79f7bb24a71485c8f4fd07035e8cf31a9db Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 31 Oct 2025 14:48:41 -0700 Subject: [PATCH 7/8] Refactor exception creation when initializing resources Use ConsumerUtils.maybeWrapAsKafkaException to possibly reduce an extra layer of exception handling. --- .../clients/consumer/internals/ConsumerNetworkThread.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 08f0745d7d035..a703d795b19cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -128,9 +128,8 @@ public void run() { try { initializeResources(); } catch (Throwable t) { - maybeSetInitializationError( - new KafkaException("Consumer network thread resource initialization failed", t) - ); + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + maybeSetInitializationError(e); cleanup(); return; } finally { From c4d21731516c5b8b6f293a9dbaacf7ad5c521c76 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 31 Oct 2025 15:05:06 -0700 Subject: [PATCH 8/8] Workaround for SpotBugs complaint --- .../internals/ConsumerNetworkThread.java | 24 +++++++++++++++++-- .../events/ApplicationEventHandler.java | 10 +------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index a703d795b19cd..4851878af929e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; @@ -25,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.IdempotentCloser; @@ -49,6 +51,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import javax.security.auth.spi.LoginModule; + import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -100,7 +104,22 @@ public ConsumerNetworkThread(LogContext logContext, this.asyncConsumerMetrics = asyncConsumerMetrics; } - public void awaitInitialization(int timeoutMs) { + /** + * Start the network thread and let it complete its initialization before proceeding. The + * {@link ClassicKafkaConsumer} constructor blocks during creation of its {@link NetworkClient}, providing + * precedent for waiting here. + * + * In certain cases (e.g. an invalid {@link LoginModule} in {@link SaslConfigs#SASL_JAAS_CONFIG}), an error + * could be thrown during {@link #initializeResources()}. This would result in the {@link #run()} method + * exiting, no longer able to process events, which means that the consumer effectively hangs. + * + * @param timeoutMs Length of time, in milliseconds, to wait for the thread to start and complete initialization + */ + public void start(int timeoutMs) { + // start() is invoked internally instead of by the caller to avoid SpotBugs errors about starting a thread + // in a constructor. + start(); + try { if (!initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) { maybeSetInitializationError( @@ -130,7 +149,8 @@ public void run() { } catch (Throwable t) { KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); maybeSetInitializationError(e); - cleanup(); + + // This will still call cleanup() via the `finally` section below. return; } finally { initializationLatch.countDown(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index 606b57c73ed3c..645b121483f88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -71,15 +71,7 @@ public ApplicationEventHandler(final LogContext logContext, requestManagersSupplier, asyncConsumerMetrics); - // Start the network thread and let it complete its initialization before proceeding. The ClassicKafkaConsumer - // constructor blocks during creation of its NetworkClient, thus providing a precedent for waiting here. - // - // In certain cases (e.g. an invalid javax.security.auth.spi.LoginModule in sasl.jaas.config), an error - // could be thrown during ConsumerNetworkThread.initializeResources(). This would result in the - // ConsumerNetworkThread.run() method exiting, no longer able to process events, which means that the consumer - // effectively hangs. - this.networkThread.start(); - this.networkThread.awaitInitialization(initializationTimeoutMs); + this.networkThread.start(initializationTimeoutMs); } /**