KAFKA-10840: Expose authentication failures in KafkaConsumer.poll() #20212
+270
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR implements KAFKA-10840 to expose authentication failures in the KafkaConsumer.poll() method, allowing applications to catch authentication issues immediately instead of experiencing silent failures.
Problem
Previously, when SSL certificates expired or other authentication issues occurred, the consumer would stop fetching data without clear indication of the underlying problem. This led to "data flow stops without indication" scenarios that were difficult to troubleshoot and handle gracefully.
Solution
New Exception Classes
Core Changes
ClassicKafkaConsumer.poll()
to actively check all known cluster nodes for authentication exceptions before proceeding with fetch operationsauthenticationException(Node)
method toConsumerNetworkClient
to expose authentication state from the underlyingKafkaClient
MockClient
withsetNodeAuthenticationFailure()
method for testing authentication failure scenariosError Detection Logic
The implementation detects certificate expiration by checking for specific patterns in SSL exception messages:
Usage Example
Test Plan
Files Changed
clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java
(NEW)clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java
(NEW)clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
(MODIFIED)clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
(MODIFIED)clients/src/test/java/org/apache/kafka/clients/MockClient.java
(MODIFIED)This addresses the core issue where "data flow stops without indication" when authentication fails, enabling applications to detect and handle these failures proactively rather than experiencing silent timeouts.