diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 0e4119b9e33e9..257a334c2e265 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.common.Node; import org.apache.kafka.clients.consumer.CloseOptions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -45,8 +46,12 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.CertificateExpiredAuthenticationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.PersistentAuthenticationException; +import org.apache.kafka.common.errors.SslAuthenticationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; @@ -627,6 +632,7 @@ public ConsumerRecords poll(final Duration timeout) { /** * @throws KafkaException if the rebalance callback throws exception + * @throws AuthenticationException if authentication fails */ private ConsumerRecords poll(final Timer timer) { acquireAndEnsureOpen(); @@ -640,6 +646,9 @@ private ConsumerRecords poll(final Timer timer) { do { client.maybeTriggerWakeup(); + // Check for authentication failures that should be surfaced to the application + checkForAuthenticationFailures(); + // try to update assignment metadata BUT do not need to block on the timer for join group updateAssignmentMetadataIfNeeded(timer, false); @@ -1283,6 +1292,48 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } + /** + * Check for authentication failures that should be surfaced to the application. + * This method checks for authentication exceptions on connected nodes and throws + * appropriate exceptions for permanent failures like certificate expiration. + * + * @throws AuthenticationException if authentication has failed + */ + private void checkForAuthenticationFailures() { + // Check authentication failures for all known nodes + for (Node node : metadata.fetch().nodes()) { + AuthenticationException authException = client.authenticationException(node); + if (authException != null) { + log.error("Authentication failed for node {}: {}", node, authException.getMessage()); + + // Check for specific SSL certificate-related errors + if (authException instanceof SslAuthenticationException) { + SslAuthenticationException sslException = (SslAuthenticationException) authException; + String message = sslException.getMessage(); + + // Check for certificate expiration patterns in the error message + if (message != null && ( + message.contains("certificate expired") || + message.contains("Certificate expired") || + message.contains("CERTIFICATE_EXPIRED") || + message.contains("certificate has expired") || + message.contains("expired certificate"))) { + throw new CertificateExpiredAuthenticationException( + "SSL certificate has expired for node " + node + ": " + message, sslException); + } + + // For other SSL authentication failures, throw PersistentAuthenticationException + throw new PersistentAuthenticationException( + "SSL authentication failed for node " + node + ": " + message, sslException); + } + + // For non-SSL authentication failures, also treat as persistent + throw new PersistentAuthenticationException( + "Authentication failed for node " + node + ": " + authException.getMessage(), authException); + } + } + } + // Functions below are for testing only @Override public String clientId() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 1f07bbcaa2f9a..bb0db93c92db1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -593,6 +593,15 @@ public void tryConnect(Node node) { } } + /** + * Get the authentication exception for a given node, if any. + * @param node the node to check + * @return an AuthenticationException iff authentication has failed, null otherwise + */ + public AuthenticationException authenticationException(Node node) { + return client.authenticationException(node); + } + private class RequestFutureCompletionHandler implements RequestCompletionHandler { private final RequestFuture future; private ClientResponse response; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java new file mode 100644 index 0000000000000..657ad28e8f1d6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates that SSL certificate has expired during authentication. + * This is a non-retriable error that requires certificate renewal or configuration update. + *

+ * Certificate expiration is a permanent authentication failure that clients should + * handle gracefully by failing fast and providing clear error messaging to allow + * operators to take corrective action. + *

+ */ +public class CertificateExpiredAuthenticationException extends SslAuthenticationException { + + private static final long serialVersionUID = 1L; + + public CertificateExpiredAuthenticationException(String message) { + super(message); + } + + public CertificateExpiredAuthenticationException(String message, Throwable cause) { + super(message, cause); + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java new file mode 100644 index 0000000000000..e0f6ab0b0b385 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates a persistent authentication failure that is unlikely to succeed on retry. + * This includes issues like expired certificates, invalid credentials, or configuration problems + * that require manual intervention. + *

+ * Unlike transient authentication failures that may succeed on retry, persistent failures + * indicate configuration or credential issues that need to be resolved before the client + * can successfully authenticate. + *

+ */ +public class PersistentAuthenticationException extends AuthenticationException { + + private static final long serialVersionUID = 1L; + + public PersistentAuthenticationException(String message) { + super(message); + } + + public PersistentAuthenticationException(String message, Throwable cause) { + super(message, cause); + } + +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index fca0a9ca2121b..ef94a54df680a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -174,6 +174,16 @@ public void createPendingAuthenticationError(Node node, long backoffMs) { pendingAuthenticationErrors.put(node, backoffMs); } + /** + * Set a specific authentication exception for a node. This is useful for testing + * specific authentication failure scenarios. + */ + public void setNodeAuthenticationFailure(Node node, AuthenticationException exception) { + pendingAuthenticationErrors.remove(node); + authenticationErrors.put(node, exception); + disconnect(node.idString()); + } + @Override public boolean connectionFailed(Node node) { return connectionState(node.idString()).isBackingOff(time.milliseconds()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index b5ab39e62c720..8b261441343a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.SslAuthenticationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; @@ -411,6 +412,24 @@ public boolean ready(Node node, long now) { assertEquals(2, client.inFlightRequestCount(node.idString())); } + @Test + public void testAuthenticationExceptionExposure() { + // Test that ConsumerNetworkClient exposes authentication exceptions from underlying client + SslAuthenticationException sslException = new SslAuthenticationException("certificate expired"); + client.setNodeAuthenticationFailure(node, sslException); + + AuthenticationException result = consumerClient.authenticationException(node); + assertEquals(sslException, result); + assertTrue(result.getMessage().contains("certificate expired")); + } + + @Test + public void testAuthenticationExceptionNull() { + // Test that no authentication exception returns null + AuthenticationException result = consumerClient.authenticationException(node); + assertEquals(null, result); + } + private HeartbeatRequest.Builder heartbeat() { return new HeartbeatRequest.Builder(new HeartbeatRequestData() .setGroupId("group") diff --git a/clients/src/test/java/org/apache/kafka/common/errors/AuthenticationExceptionTypesTest.java b/clients/src/test/java/org/apache/kafka/common/errors/AuthenticationExceptionTypesTest.java new file mode 100644 index 0000000000000..f4c00d4f9ecb4 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/errors/AuthenticationExceptionTypesTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for the new authentication exception types added for KAFKA-10840. + */ +public class AuthenticationExceptionTypesTest { + + @Test + public void testCertificateExpiredAuthenticationException() { + String message = "SSL certificate has expired"; + CertificateExpiredAuthenticationException exception = + new CertificateExpiredAuthenticationException(message); + + assertEquals(message, exception.getMessage()); + assertInstanceOf(SslAuthenticationException.class, exception); + assertInstanceOf(AuthenticationException.class, exception); + } + + @Test + public void testCertificateExpiredAuthenticationExceptionWithCause() { + String message = "SSL certificate has expired"; + RuntimeException cause = new RuntimeException("Root cause"); + CertificateExpiredAuthenticationException exception = + new CertificateExpiredAuthenticationException(message, cause); + + assertEquals(message, exception.getMessage()); + assertEquals(cause, exception.getCause()); + assertInstanceOf(SslAuthenticationException.class, exception); + } + + @Test + public void testPersistentAuthenticationException() { + String message = "Authentication failed persistently"; + PersistentAuthenticationException exception = + new PersistentAuthenticationException(message); + + assertEquals(message, exception.getMessage()); + assertInstanceOf(AuthenticationException.class, exception); + } + + @Test + public void testPersistentAuthenticationExceptionWithCause() { + String message = "Authentication failed persistently"; + RuntimeException cause = new RuntimeException("Root cause"); + PersistentAuthenticationException exception = + new PersistentAuthenticationException(message, cause); + + assertEquals(message, exception.getMessage()); + assertEquals(cause, exception.getCause()); + assertInstanceOf(AuthenticationException.class, exception); + } + + @Test + public void testExceptionHierarchy() { + CertificateExpiredAuthenticationException certException = + new CertificateExpiredAuthenticationException("cert expired"); + PersistentAuthenticationException persistentException = + new PersistentAuthenticationException("auth failed"); + + // Verify inheritance hierarchy + assertTrue(certException instanceof SslAuthenticationException); + assertTrue(certException instanceof AuthenticationException); + assertTrue(persistentException instanceof AuthenticationException); + } + + @Test + public void testExceptionSerialVersionUID() { + // Verify that serialVersionUID is set to maintain compatibility + CertificateExpiredAuthenticationException certException = + new CertificateExpiredAuthenticationException("test"); + PersistentAuthenticationException persistentException = + new PersistentAuthenticationException("test"); + + // These exceptions should be serializable (extends Exception/RuntimeException) + assertInstanceOf(Exception.class, certException); + assertInstanceOf(Exception.class, persistentException); + } +} \ No newline at end of file