Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -660,6 +661,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand All @@ -677,6 +679,7 @@ interface ApplicationEventHandlerFactory {
ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final int initializationTimeoutMs,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.KafkaThread;
Expand All @@ -41,8 +46,13 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import javax.security.auth.spi.LoginModule;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
import static org.apache.kafka.common.utils.Utils.closeQuietly;

Expand All @@ -68,6 +78,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
private RequestManagers requestManagers;
private volatile boolean running;
private final IdempotentCloser closer = new IdempotentCloser();
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private final AtomicReference<KafkaException> initializationError = new AtomicReference<>();
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
private long lastPollTimeMs = 0L;
Expand All @@ -92,13 +104,57 @@ public ConsumerNetworkThread(LogContext logContext,
this.asyncConsumerMetrics = asyncConsumerMetrics;
}

/**
* Start the network thread and let it complete its initialization before proceeding. The
* {@link ClassicKafkaConsumer} constructor blocks during creation of its {@link NetworkClient}, providing
* precedent for waiting here.
*
* In certain cases (e.g. an invalid {@link LoginModule} in {@link SaslConfigs#SASL_JAAS_CONFIG}), an error
* could be thrown during {@link #initializeResources()}. This would result in the {@link #run()} method
* exiting, no longer able to process events, which means that the consumer effectively hangs.
*
* @param timeoutMs Length of time, in milliseconds, to wait for the thread to start and complete initialization
*/
public void start(int timeoutMs) {
// start() is invoked internally instead of by the caller to avoid SpotBugs errors about starting a thread
// in a constructor.
start();

try {
if (!initializationLatch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
maybeSetInitializationError(
new TimeoutException("Consumer network thread resource initialization timed out after " + timeoutMs + " ms")
);
}
} catch (InterruptedException e) {
maybeSetInitializationError(
new InterruptException("Consumer network thread resource initialization was interrupted", e)
);
}

KafkaException e = initializationError.get();

if (e != null)
throw e;
}

@Override
public void run() {
try {
log.debug("Consumer network thread started");

// Wait until we're securely in the background network thread to initialize these objects...
initializeResources();
try {
initializeResources();
} catch (Throwable t) {
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
maybeSetInitializationError(e);

// This will still call cleanup() via the `finally` section below.
return;
} finally {
initializationLatch.countDown();
}

while (running) {
try {
Expand All @@ -108,13 +164,20 @@ public void run() {
log.error("Unexpected error caught in consumer network thread", e);
}
}
} catch (final Throwable e) {
log.error("Failed to initialize resources for consumer network thread", e);
} catch (Throwable t) {
log.error("Unexpected failure in consumer network thread", t);
} finally {
cleanup();
}
}

private void maybeSetInitializationError(KafkaException error) {
if (initializationError.compareAndSet(null, error))
return;

log.error("Consumer network thread resource initialization error ({}) will be suppressed as an error was already set", error.getMessage(), error);
}

void initializeResources() {
applicationEventProcessor = applicationEventProcessorSupplier.get();
networkClientDelegate = networkClientDelegateSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -413,6 +414,7 @@ private void process(final ShareAcknowledgementCommitCallbackEvent event) {
this.applicationEventHandler = new ApplicationEventHandler(
logContext,
time,
config.getInt(CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG),
applicationEventQueue,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
Expand Down Expand Up @@ -478,6 +480,7 @@ interface ApplicationEventHandlerFactory {
ApplicationEventHandler build(
final LogContext logContext,
final Time time,
final int initializationTimeoutMs,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ApplicationEventHandler implements Closeable {

public ApplicationEventHandler(final LogContext logContext,
final Time time,
final int initializationTimeoutMs,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Expand All @@ -69,7 +70,8 @@ public ApplicationEventHandler(final LogContext logContext,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics);
this.networkThread.start();

this.networkThread.start(initializationTimeoutMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -131,6 +133,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
Expand All @@ -152,6 +155,7 @@

import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.security.auth.login.LoginException;

import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.DEFAULT_REASON;
Expand Down Expand Up @@ -3796,6 +3800,58 @@ void testMonitorablePlugins(GroupProtocol groupProtocol) {
}
}

/**
* This test ensures that both {@link Consumer} implementations fail on creation when the underlying
* {@link NetworkClient} fails creation.
*
* The logic to check for this case is admittedly a bit awkward because the constructor can fail for all
* manner of reasons. So a failure case is created by specifying an invalid
* {@link javax.security.auth.spi.LoginModule} class name, which in turn causes the {@link NetworkClient}
* to fail.
*
* This test was created to validate the change for KAFKA-19394 for the {@link AsyncKafkaConsumer}. The fix
* should handle the case where failure during initialization of resources (in this test, the underlying
* {@link NetworkClient}) will not cause the creation of the {@link AsyncKafkaConsumer} to hang.
*/
@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testConstructorFailOnNetworkClientConstructorFailure(GroupProtocol groupProtocol) {
Map<String, Object> configs = Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name,
SaslConfigs.SASL_MECHANISM, "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG, "org.example.InvalidLoginModule required ;",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)
);

KafkaException e = assertThrows(KafkaException.class, () -> {
try (KafkaConsumer<String, String> ignored = new KafkaConsumer<>(configs)) {
fail("Should not be able to create the consumer");
}
});

assertEquals("Failed to construct kafka consumer", e.getMessage());

// The root cause is multiple exceptions deep. This code is more concise and should hopefully be trivial
// to update should the underlying implementation change.
Throwable cause = e.getCause();
assertNotNull(cause);
assertInstanceOf(KafkaException.class, cause);
assertEquals("Failed to create new NetworkClient", cause.getMessage());

cause = cause.getCause();
assertNotNull(cause);
assertInstanceOf(KafkaException.class, cause);
assertEquals(LoginException.class.getName() + ": No LoginModule found for org.example.InvalidLoginModule", cause.getMessage());

cause = cause.getCause();
assertNotNull(cause);
assertInstanceOf(LoginException.class, cause);
assertEquals("No LoginModule found for org.example.InvalidLoginModule", cause.getMessage());
}

private MetricName expectedMetricName(String clientId, String config, Class<?> clazz) {
Map<String, String> expectedTags = new LinkedHashMap<>();
expectedTags.put("client-id", clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,31 @@
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

public class ApplicationEventHandlerTest {
private final Time time = new MockTime();
private final int initializationTimeoutMs = 50;
private final BlockingQueue<ApplicationEvent> applicationEventsQueue = new LinkedBlockingQueue<>();
private final ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class);
private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class);
Expand All @@ -53,6 +61,7 @@ public void testRecordApplicationEventQueueSize(String groupName) {
ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler(
new LogContext(),
time,
initializationTimeoutMs,
applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
Expand All @@ -65,4 +74,52 @@ public void testRecordApplicationEventQueueSize(String groupName) {
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
}
}

@Test
public void testFailOnInitializeResources() {
RuntimeException rootFailure = new RuntimeException("root failure");
KafkaException error = assertInitializeResourcesError(
KafkaException.class,
() -> {
throw rootFailure;
}
);
assertEquals(rootFailure, error.getCause());
}

@Test
public void testDelayInInitializeResources() {
assertInitializeResourcesError(
TimeoutException.class,
() -> {
long delayMs = initializationTimeoutMs * 2;
org.apache.kafka.common.utils.Utils.sleep(delayMs);
return networkClientDelegate;
}
);
}

@Test
public void testInterruptInInitializeResources() {
Thread.currentThread().interrupt();
assertInitializeResourcesError(InterruptException.class, () -> networkClientDelegate);
}

private <T extends Throwable> T assertInitializeResourcesError(Class<T> exceptionClass,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier) {
try (Metrics metrics = new Metrics();
AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics, "test-group"))) {
return assertThrows(exceptionClass, () -> new ApplicationEventHandler(
new LogContext(),
time,
initializationTimeoutMs,
applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
networkClientDelegateSupplier,
() -> requestManagers,
asyncConsumerMetrics
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private AsyncKafkaConsumer<String, String> newConsumerWithStreamRebalanceData(
new StringDeserializer(),
new StringDeserializer(),
time,
(logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
(logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
logContext -> backgroundEventReaper,
(logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector,
(consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata,
Expand All @@ -238,7 +238,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig config) {
new StringDeserializer(),
new StringDeserializer(),
time,
(logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
(logContext, time, initializationTimeoutMs, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> applicationEventHandler,
logContext -> backgroundEventReaper,
(logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> fetchCollector,
(consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> metadata,
Expand Down
Loading
Loading