Skip to content

KAFKA-10840: Expose authentication failures in KafkaConsumer.poll() #20212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand All @@ -45,8 +46,12 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.CertificateExpiredAuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.PersistentAuthenticationException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
Expand Down Expand Up @@ -627,6 +632,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {

/**
* @throws KafkaException if the rebalance callback throws exception
* @throws AuthenticationException if authentication fails
*/
private ConsumerRecords<K, V> poll(final Timer timer) {
acquireAndEnsureOpen();
Expand All @@ -640,6 +646,9 @@ private ConsumerRecords<K, V> poll(final Timer timer) {
do {
client.maybeTriggerWakeup();

// Check for authentication failures that should be surfaced to the application
checkForAuthenticationFailures();

// try to update assignment metadata BUT do not need to block on the timer for join group
updateAssignmentMetadataIfNeeded(timer, false);

Expand Down Expand Up @@ -1283,6 +1292,48 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}

/**
* Check for authentication failures that should be surfaced to the application.
* This method checks for authentication exceptions on connected nodes and throws
* appropriate exceptions for permanent failures like certificate expiration.
*
* @throws AuthenticationException if authentication has failed
*/
private void checkForAuthenticationFailures() {
// Check authentication failures for all known nodes
for (Node node : metadata.fetch().nodes()) {
AuthenticationException authException = client.authenticationException(node);
if (authException != null) {
log.error("Authentication failed for node {}: {}", node, authException.getMessage());

// Check for specific SSL certificate-related errors
if (authException instanceof SslAuthenticationException) {
SslAuthenticationException sslException = (SslAuthenticationException) authException;
String message = sslException.getMessage();

// Check for certificate expiration patterns in the error message
if (message != null && (
message.contains("certificate expired") ||
message.contains("Certificate expired") ||
message.contains("CERTIFICATE_EXPIRED") ||
message.contains("certificate has expired") ||
message.contains("expired certificate"))) {
throw new CertificateExpiredAuthenticationException(
"SSL certificate has expired for node " + node + ": " + message, sslException);
}

// For other SSL authentication failures, throw PersistentAuthenticationException
throw new PersistentAuthenticationException(
"SSL authentication failed for node " + node + ": " + message, sslException);
}

// For non-SSL authentication failures, also treat as persistent
throw new PersistentAuthenticationException(
"Authentication failed for node " + node + ": " + authException.getMessage(), authException);
}
}
}

// Functions below are for testing only
@Override
public String clientId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,15 @@ public void tryConnect(Node node) {
}
}

/**
* Get the authentication exception for a given node, if any.
* @param node the node to check
* @return an AuthenticationException iff authentication has failed, null otherwise
*/
public AuthenticationException authenticationException(Node node) {
return client.authenticationException(node);
}

private class RequestFutureCompletionHandler implements RequestCompletionHandler {
private final RequestFuture<ClientResponse> future;
private ClientResponse response;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* This exception indicates that SSL certificate has expired during authentication.
* This is a non-retriable error that requires certificate renewal or configuration update.
* <p>
* Certificate expiration is a permanent authentication failure that clients should
* handle gracefully by failing fast and providing clear error messaging to allow
* operators to take corrective action.
* </p>
*/
public class CertificateExpiredAuthenticationException extends SslAuthenticationException {

private static final long serialVersionUID = 1L;

public CertificateExpiredAuthenticationException(String message) {
super(message);
}

public CertificateExpiredAuthenticationException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* This exception indicates a persistent authentication failure that is unlikely to succeed on retry.
* This includes issues like expired certificates, invalid credentials, or configuration problems
* that require manual intervention.
* <p>
* Unlike transient authentication failures that may succeed on retry, persistent failures
* indicate configuration or credential issues that need to be resolved before the client
* can successfully authenticate.
* </p>
*/
public class PersistentAuthenticationException extends AuthenticationException {

private static final long serialVersionUID = 1L;

public PersistentAuthenticationException(String message) {
super(message);
}

public PersistentAuthenticationException(String message, Throwable cause) {
super(message, cause);
}

}
10 changes: 10 additions & 0 deletions clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ public void createPendingAuthenticationError(Node node, long backoffMs) {
pendingAuthenticationErrors.put(node, backoffMs);
}

/**
* Set a specific authentication exception for a node. This is useful for testing
* specific authentication failure scenarios.
*/
public void setNodeAuthenticationFailure(Node node, AuthenticationException exception) {
pendingAuthenticationErrors.remove(node);
authenticationErrors.put(node, exception);
disconnect(node.idString());
}

@Override
public boolean connectionFailed(Node node) {
return connectionState(node.idString()).isBackingOff(time.milliseconds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.SslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
Expand Down Expand Up @@ -411,6 +412,24 @@ public boolean ready(Node node, long now) {
assertEquals(2, client.inFlightRequestCount(node.idString()));
}

@Test
public void testAuthenticationExceptionExposure() {
// Test that ConsumerNetworkClient exposes authentication exceptions from underlying client
SslAuthenticationException sslException = new SslAuthenticationException("certificate expired");
client.setNodeAuthenticationFailure(node, sslException);

AuthenticationException result = consumerClient.authenticationException(node);
assertEquals(sslException, result);
assertTrue(result.getMessage().contains("certificate expired"));
}

@Test
public void testAuthenticationExceptionNull() {
// Test that no authentication exception returns null
AuthenticationException result = consumerClient.authenticationException(node);
assertEquals(null, result);
}

private HeartbeatRequest.Builder heartbeat() {
return new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId("group")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests for the new authentication exception types added for KAFKA-10840.
*/
public class AuthenticationExceptionTypesTest {

@Test
public void testCertificateExpiredAuthenticationException() {
String message = "SSL certificate has expired";
CertificateExpiredAuthenticationException exception =
new CertificateExpiredAuthenticationException(message);

assertEquals(message, exception.getMessage());
assertInstanceOf(SslAuthenticationException.class, exception);
assertInstanceOf(AuthenticationException.class, exception);
}

@Test
public void testCertificateExpiredAuthenticationExceptionWithCause() {
String message = "SSL certificate has expired";
RuntimeException cause = new RuntimeException("Root cause");
CertificateExpiredAuthenticationException exception =
new CertificateExpiredAuthenticationException(message, cause);

assertEquals(message, exception.getMessage());
assertEquals(cause, exception.getCause());
assertInstanceOf(SslAuthenticationException.class, exception);
}

@Test
public void testPersistentAuthenticationException() {
String message = "Authentication failed persistently";
PersistentAuthenticationException exception =
new PersistentAuthenticationException(message);

assertEquals(message, exception.getMessage());
assertInstanceOf(AuthenticationException.class, exception);
}

@Test
public void testPersistentAuthenticationExceptionWithCause() {
String message = "Authentication failed persistently";
RuntimeException cause = new RuntimeException("Root cause");
PersistentAuthenticationException exception =
new PersistentAuthenticationException(message, cause);

assertEquals(message, exception.getMessage());
assertEquals(cause, exception.getCause());
assertInstanceOf(AuthenticationException.class, exception);
}

@Test
public void testExceptionHierarchy() {
CertificateExpiredAuthenticationException certException =
new CertificateExpiredAuthenticationException("cert expired");
PersistentAuthenticationException persistentException =
new PersistentAuthenticationException("auth failed");

// Verify inheritance hierarchy
assertTrue(certException instanceof SslAuthenticationException);
assertTrue(certException instanceof AuthenticationException);
assertTrue(persistentException instanceof AuthenticationException);
}

@Test
public void testExceptionSerialVersionUID() {
// Verify that serialVersionUID is set to maintain compatibility
CertificateExpiredAuthenticationException certException =
new CertificateExpiredAuthenticationException("test");
PersistentAuthenticationException persistentException =
new PersistentAuthenticationException("test");

// These exceptions should be serializable (extends Exception/RuntimeException)
assertInstanceOf(Exception.class, certException);
assertInstanceOf(Exception.class, persistentException);
}
}