4444import org .apache .kafka .clients .consumer .internals .events .BackgroundEvent ;
4545import org .apache .kafka .clients .consumer .internals .events .BackgroundEventHandler ;
4646import org .apache .kafka .clients .consumer .internals .events .CommitApplicationEvent ;
47+ import org .apache .kafka .clients .consumer .internals .events .CommitOnCloseApplicationEvent ;
4748import org .apache .kafka .clients .consumer .internals .events .CompletableApplicationEvent ;
4849import org .apache .kafka .clients .consumer .internals .events .ConsumerRebalanceListenerCallbackCompletedEvent ;
4950import org .apache .kafka .clients .consumer .internals .events .ConsumerRebalanceListenerCallbackNeededEvent ;
5051import org .apache .kafka .clients .consumer .internals .events .ErrorBackgroundEvent ;
5152import org .apache .kafka .clients .consumer .internals .events .EventProcessor ;
5253import org .apache .kafka .clients .consumer .internals .events .FetchCommittedOffsetsApplicationEvent ;
5354import org .apache .kafka .clients .consumer .internals .events .GroupMetadataUpdateEvent ;
55+ import org .apache .kafka .clients .consumer .internals .events .LeaveOnCloseApplicationEvent ;
5456import org .apache .kafka .clients .consumer .internals .events .ListOffsetsApplicationEvent ;
5557import org .apache .kafka .clients .consumer .internals .events .NewTopicsMetadataUpdateRequestEvent ;
5658import org .apache .kafka .clients .consumer .internals .events .ResetPositionsApplicationEvent ;
8385import org .apache .kafka .common .utils .LogContext ;
8486import org .apache .kafka .common .utils .Time ;
8587import org .apache .kafka .common .utils .Timer ;
88+ import org .apache .kafka .common .utils .Utils ;
8689import org .slf4j .Logger ;
90+ import org .slf4j .event .Level ;
8791
8892import java .net .InetSocketAddress ;
8993import java .time .Duration ;
9498import java .util .HashSet ;
9599import java .util .List ;
96100import java .util .Map ;
101+ import java .util .Objects ;
97102import java .util .Optional ;
98103import java .util .OptionalLong ;
99104import java .util .Set ;
100105import java .util .SortedSet ;
106+ import java .util .TreeSet ;
101107import java .util .concurrent .BlockingQueue ;
102108import java .util .concurrent .CompletableFuture ;
103109import java .util .concurrent .Future ;
124130import static org .apache .kafka .common .utils .Utils .closeQuietly ;
125131import static org .apache .kafka .common .utils .Utils .isBlank ;
126132import static org .apache .kafka .common .utils .Utils .join ;
133+ import static org .apache .kafka .common .utils .Utils .swallow ;
127134
128135/**
129136 * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process
@@ -245,7 +252,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
245252
246253 private final ApplicationEventHandler applicationEventHandler ;
247254 private final Time time ;
248- private Optional <ConsumerGroupMetadata > groupMetadata ;
255+ private Optional <ConsumerGroupMetadata > groupMetadata = Optional . empty () ;
249256 private final KafkaConsumerMetrics kafkaConsumerMetrics ;
250257 private Logger log ;
251258 private final String clientId ;
@@ -268,6 +275,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
268275 private final Metrics metrics ;
269276 private final long retryBackoffMs ;
270277 private final int defaultApiTimeoutMs ;
278+ private final boolean autoCommitEnabled ;
271279 private volatile boolean closed = false ;
272280 private final List <ConsumerPartitionAssignor > assignors ;
273281 private final Optional <ClientTelemetryReporter > clientTelemetryReporter ;
@@ -313,6 +321,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
313321 GroupRebalanceConfig .ProtocolType .CONSUMER
314322 );
315323 this .clientId = config .getString (CommonClientConfigs .CLIENT_ID_CONFIG );
324+ this .autoCommitEnabled = config .getBoolean (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG );
316325 LogContext logContext = createLogContext (config , groupRebalanceConfig );
317326 this .log = logContext .logger (getClass ());
318327
@@ -434,6 +443,51 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
434443 }
435444
436445 // Visible for testing
446+ AsyncKafkaConsumer (LogContext logContext ,
447+ String clientId ,
448+ Deserializers <K , V > deserializers ,
449+ FetchBuffer fetchBuffer ,
450+ FetchCollector <K , V > fetchCollector ,
451+ ConsumerInterceptors <K , V > interceptors ,
452+ Time time ,
453+ ApplicationEventHandler applicationEventHandler ,
454+ BlockingQueue <BackgroundEvent > backgroundEventQueue ,
455+ ConsumerRebalanceListenerInvoker rebalanceListenerInvoker ,
456+ Metrics metrics ,
457+ SubscriptionState subscriptions ,
458+ ConsumerMetadata metadata ,
459+ long retryBackoffMs ,
460+ int defaultApiTimeoutMs ,
461+ List <ConsumerPartitionAssignor > assignors ,
462+ String groupId ,
463+ boolean autoCommitEnabled ) {
464+ this .log = logContext .logger (getClass ());
465+ this .subscriptions = subscriptions ;
466+ this .clientId = clientId ;
467+ this .fetchBuffer = fetchBuffer ;
468+ this .fetchCollector = fetchCollector ;
469+ this .isolationLevel = IsolationLevel .READ_UNCOMMITTED ;
470+ this .interceptors = Objects .requireNonNull (interceptors );
471+ this .time = time ;
472+ this .backgroundEventProcessor = new BackgroundEventProcessor (
473+ logContext ,
474+ backgroundEventQueue ,
475+ applicationEventHandler ,
476+ rebalanceListenerInvoker
477+ );
478+ this .metrics = metrics ;
479+ this .groupMetadata = initializeGroupMetadata (groupId , Optional .empty ());
480+ this .metadata = metadata ;
481+ this .retryBackoffMs = retryBackoffMs ;
482+ this .defaultApiTimeoutMs = defaultApiTimeoutMs ;
483+ this .deserializers = deserializers ;
484+ this .applicationEventHandler = applicationEventHandler ;
485+ this .assignors = assignors ;
486+ this .kafkaConsumerMetrics = new KafkaConsumerMetrics (metrics , "consumer" );
487+ this .clientTelemetryReporter = Optional .empty ();
488+ this .autoCommitEnabled = autoCommitEnabled ;
489+ }
490+
437491 AsyncKafkaConsumer (LogContext logContext ,
438492 Time time ,
439493 ConsumerConfig config ,
@@ -446,6 +500,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
446500 this .log = logContext .logger (getClass ());
447501 this .subscriptions = subscriptions ;
448502 this .clientId = config .getString (ConsumerConfig .CLIENT_ID_CONFIG );
503+ this .autoCommitEnabled = config .getBoolean (ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG );
449504 this .fetchBuffer = new FetchBuffer (logContext );
450505 this .isolationLevel = IsolationLevel .READ_UNCOMMITTED ;
451506 this .interceptors = new ConsumerInterceptors <>(Collections .emptyList ());
@@ -1159,15 +1214,12 @@ private void close(Duration timeout, boolean swallowException) {
11591214 final Timer closeTimer = time .timer (timeout );
11601215 clientTelemetryReporter .ifPresent (reporter -> reporter .initiateClose (timeout .toMillis ()));
11611216 closeTimer .update ();
1162-
1217+ // Prepare shutting down the network thread
1218+ prepareShutdown (closeTimer , firstException );
1219+ closeTimer .update ();
11631220 if (applicationEventHandler != null )
1164- closeQuietly (() -> applicationEventHandler .close (Duration .ofMillis (closeTimer .remainingMs ())), "Failed to close application event handler with a timeout(ms)=" + closeTimer .remainingMs (), firstException );
1165-
1166- // Invoke all callbacks after the background thread exists in case if there are unsent async
1167- // commits
1168- maybeInvokeCommitCallbacks ();
1169-
1170- closeQuietly (fetchBuffer , "Failed to close the fetch buffer" , firstException );
1221+ closeQuietly (() -> applicationEventHandler .close (Duration .ofMillis (closeTimer .remainingMs ())), "Failed shutting down network thread" , firstException );
1222+ closeTimer .update ();
11711223 closeQuietly (interceptors , "consumer interceptors" , firstException );
11721224 closeQuietly (kafkaConsumerMetrics , "kafka consumer metrics" , firstException );
11731225 closeQuietly (metrics , "consumer metrics" , firstException );
@@ -1185,6 +1237,74 @@ private void close(Duration timeout, boolean swallowException) {
11851237 }
11861238 }
11871239
1240+ /**
1241+ * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
1242+ * 1. autocommit offsets
1243+ * 2. revoke all partitions
1244+ * 3. if partition revocation completes successfully, send leave group
1245+ * 4. invoke all async commit callbacks if there is any
1246+ */
1247+ void prepareShutdown (final Timer timer , final AtomicReference <Throwable > firstException ) {
1248+ if (!groupMetadata .isPresent ())
1249+ return ;
1250+ maybeAutoCommitSync (autoCommitEnabled , timer , firstException );
1251+ applicationEventHandler .add (new CommitOnCloseApplicationEvent ());
1252+ completeQuietly (
1253+ () -> {
1254+ maybeRevokePartitions ();
1255+ applicationEventHandler .addAndGet (new LeaveOnCloseApplicationEvent (), timer );
1256+ },
1257+ "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer .timeoutMs (), firstException );
1258+ swallow (log , Level .ERROR , "Failed invoking asynchronous commit callback." , this ::maybeInvokeCommitCallbacks ,
1259+ firstException );
1260+ }
1261+
1262+ // Visible for testing
1263+ void maybeAutoCommitSync (final boolean shouldAutoCommit ,
1264+ final Timer timer ,
1265+ final AtomicReference <Throwable > firstException ) {
1266+ if (!shouldAutoCommit )
1267+ return ;
1268+ Map <TopicPartition , OffsetAndMetadata > allConsumed = subscriptions .allConsumed ();
1269+ log .debug ("Sending synchronous auto-commit of offsets {} on closing" , allConsumed );
1270+ try {
1271+ commitSync (allConsumed , Duration .ofMillis (timer .remainingMs ()));
1272+ } catch (Exception e ) {
1273+ // consistent with async auto-commit failures, we do not propagate the exception
1274+ log .warn ("Synchronous auto-commit of offsets {} failed: {}" , allConsumed , e .getMessage ());
1275+ }
1276+ timer .update ();
1277+ }
1278+
1279+ // Visible for testing
1280+ void maybeRevokePartitions () {
1281+ if (!subscriptions .hasAutoAssignedPartitions () || subscriptions .assignedPartitions ().isEmpty ())
1282+ return ;
1283+ try {
1284+ SortedSet <TopicPartition > droppedPartitions = new TreeSet <>(MembershipManagerImpl .TOPIC_PARTITION_COMPARATOR );
1285+ droppedPartitions .addAll (subscriptions .assignedPartitions ());
1286+ if (subscriptions .rebalanceListener ().isPresent ())
1287+ subscriptions .rebalanceListener ().get ().onPartitionsRevoked (droppedPartitions );
1288+ } catch (Exception e ) {
1289+ throw new KafkaException (e );
1290+ } finally {
1291+ subscriptions .assignFromSubscribed (Collections .emptySet ());
1292+ }
1293+ }
1294+
1295+ // Visible for testing
1296+ void completeQuietly (final Utils .ThrowingRunnable function ,
1297+ final String msg ,
1298+ final AtomicReference <Throwable > firstException ) {
1299+ try {
1300+ function .run ();
1301+ } catch (TimeoutException e ) {
1302+ log .debug ("Timeout expired before the {} operation could complete." , msg );
1303+ } catch (Exception e ) {
1304+ firstException .compareAndSet (null , e );
1305+ }
1306+ }
1307+
11881308 @ Override
11891309 public void wakeup () {
11901310 wakeupTrigger .wakeup ();
0 commit comments