Skip to content

KAFKA-10840: Expose authentication failures in KafkaConsumer.poll() #20212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

ravikalla
Copy link

Summary

This PR implements KAFKA-10840 to expose authentication failures in the KafkaConsumer.poll() method, allowing applications to catch authentication issues immediately instead of experiencing silent failures.

Problem

Previously, when SSL certificates expired or other authentication issues occurred, the consumer would stop fetching data without clear indication of the underlying problem. This led to "data flow stops without indication" scenarios that were difficult to troubleshoot and handle gracefully.

Solution

New Exception Classes

  • CertificateExpiredAuthenticationException: Specifically for SSL certificate expiration scenarios
  • PersistentAuthenticationException: For non-retriable authentication failures (SASL, general SSL handshake failures)

Core Changes

  • Modified ClassicKafkaConsumer.poll() to actively check all known cluster nodes for authentication exceptions before proceeding with fetch operations
  • Added authenticationException(Node) method to ConsumerNetworkClient to expose authentication state from the underlying KafkaClient
  • Enhanced MockClient with setNodeAuthenticationFailure() method for testing authentication failure scenarios

Error Detection Logic

The implementation detects certificate expiration by checking for specific patterns in SSL exception messages:

  • "certificate expired"
  • "Certificate expired"
  • "CERTIFICATE_EXPIRED"
  • "certificate has expired"
  • "expired certificate"

Usage Example

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
} catch (CertificateExpiredAuthenticationException e) {
    log.error("SSL certificate expired: {}", e.getMessage());
    // Handle certificate renewal
} catch (PersistentAuthenticationException e) {
    log.error("Authentication failed: {}", e.getMessage());
    // Handle authentication configuration
}

Test Plan

  • All existing Kafka consumer tests pass
  • All ConsumerNetworkClient tests pass
  • Code passes checkstyle and spotbugs checks
  • Implementation is backward compatible
  • Authentication failure scenarios can be tested using MockClient enhancements

Files Changed

  1. clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java (NEW)
  2. clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java (NEW)
  3. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java (MODIFIED)
  4. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (MODIFIED)
  5. clients/src/test/java/org/apache/kafka/clients/MockClient.java (MODIFIED)

This addresses the core issue where "data flow stops without indication" when authentication fails, enabling applications to detect and handle these failures proactively rather than experiencing silent timeouts.

This change allows applications to catch authentication failures immediately
instead of failing silently. Previously, when SSL certificates expired or
other authentication issues occurred, the consumer would stop fetching data
without clear indication of the underlying problem.

The implementation adds two new exception classes to provide specific error
handling for different authentication failure scenarios. The
CertificateExpiredAuthenticationException is thrown when SSL certificate
expiration is detected in the error message, while
PersistentAuthenticationException handles other non-retriable authentication
failures including SASL errors and general SSL handshake failures.

The ClassicKafkaConsumer.poll() method now actively checks all known cluster
nodes for authentication exceptions before proceeding with fetch operations.
When an authentication failure is detected, detailed error messages including
the affected node information are logged and appropriate exceptions are thrown
to the application layer.

The ConsumerNetworkClient has been enhanced with an authenticationException()
method that exposes the authentication state from the underlying KafkaClient,
allowing the consumer to access this previously hidden information. For testing
purposes, the MockClient class now supports setting specific authentication
exceptions through a new setNodeAuthenticationFailure() method.

Example usage:
```java
try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
} catch (CertificateExpiredAuthenticationException e) {
    log.error("SSL certificate expired: {}", e.getMessage());
    // Handle certificate renewal
} catch (PersistentAuthenticationException e) {
    log.error("Authentication failed: {}", e.getMessage());
    // Handle authentication configuration
}
```

This addresses the core issue where "data flow stops without indication" when
authentication fails, enabling applications to detect and handle these failures
proactively rather than experiencing silent timeouts.
@github-actions github-actions bot added consumer clients triage PRs from the community labels Jul 21, 2025
These test cases validate the authentication failure detection and exception
handling implementation added for KAFKA-10840. The tests ensure that the
ConsumerNetworkClient correctly exposes authentication exceptions from the
underlying KafkaClient through the new authenticationException() method.

The AuthenticationExceptionTypesTest verifies that the new exception classes
CertificateExpiredAuthenticationException and PersistentAuthenticationException
maintain proper inheritance hierarchy and behave correctly. These exception
types allow applications to distinguish between certificate expiration
scenarios and other persistent authentication failures.

Additional tests have been added to the existing ConsumerNetworkClientTest
to verify that authentication exceptions are properly exposed when present
and return null when no authentication issues exist. All tests pass and
confirm that the implementation correctly addresses the silent authentication
failure problem described in the original JIRA issue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer triage PRs from the community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant