diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index ebdc000044052..0c9a561db5711 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -30,6 +30,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata} +import java.util import scala.collection._ import scala.jdk.CollectionConverters._ @@ -39,7 +40,7 @@ import scala.jdk.CollectionConverters._ */ class DelayedFetch( params: FetchParams, - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], + fetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus], replicaManager: ReplicaManager, quota: ReplicaQuota, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit @@ -66,79 +67,78 @@ class DelayedFetch( */ override def tryComplete(): Boolean = { var accumulatedSize = 0 - fetchPartitionStatus.foreach { - case (topicIdPartition, fetchStatus) => - val fetchOffset = fetchStatus.startOffsetMetadata - val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch - try { - if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) { - val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition) - val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader) - - val endOffset = params.isolation match { - case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset - case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark - case FetchIsolation.TXN_COMMITTED => offsetSnapshot.lastStableOffset - } + fetchPartitionStatus.forEach { (topicIdPartition, fetchStatus) => + val fetchOffset = fetchStatus.startOffsetMetadata + val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch + try { + if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) { + val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition) + val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader) + + val endOffset = params.isolation match { + case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset + case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark + case FetchIsolation.TXN_COMMITTED => offsetSnapshot.lastStableOffset + } - // Go directly to the check for Case G if the message offsets are the same. If the log segment - // has just rolled, then the high watermark offset will remain the same but be on the old segment, - // which would incorrectly be seen as an instance of Case F. - if (fetchOffset.messageOffset > endOffset.messageOffset) { - // Case F, this can happen when the new fetch operation is on a truncated leader - debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") - return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { - if (fetchOffset.onOlderSegment(endOffset)) { - // Case F, this can happen when the fetch operation is falling behind the current segment - // or the partition has just rolled a new segment - debug(s"Satisfying fetch $this immediately since it is fetching older segments.") - // We will not force complete the fetch request if a replica should be throttled. - if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) - return forceComplete() - } else if (fetchOffset.onSameSegment(endOffset)) { - // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) - val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) - if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) - accumulatedSize += bytesAvailable - } + // Go directly to the check for Case G if the message offsets are the same. If the log segment + // has just rolled, then the high watermark offset will remain the same but be on the old segment, + // which would incorrectly be seen as an instance of Case F. + if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.onOlderSegment(endOffset)) { + // Case F, this can happen when the fetch operation is falling behind the current segment + // or the partition has just rolled a new segment + debug(s"Satisfying fetch $this immediately since it is fetching older segments.") + // We will not force complete the fetch request if a replica should be throttled. + if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) + return forceComplete() + } else if (fetchOffset.onSameSegment(endOffset)) { + // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) + val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) + if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) + accumulatedSize += bytesAvailable } + } - // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation - fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch => - val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false) - if (epochEndOffset.errorCode != Errors.NONE.code() - || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET - || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) { - debug(s"Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset=$epochEndOffset.") - return forceComplete() - } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { - debug(s"Satisfying fetch $this since it has diverging epoch requiring truncation for partition " + - s"$topicIdPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") - return forceComplete() - } + // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation + fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch => + val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false) + if (epochEndOffset.errorCode != Errors.NONE.code() + || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET + || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) { + debug(s"Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset=$epochEndOffset.") + return forceComplete() + } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) { + debug(s"Satisfying fetch $this since it has diverging epoch requiring truncation for partition " + + s"$topicIdPartition epochEndOffset=$epochEndOffset fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.") + return forceComplete() } } - } catch { - case _: NotLeaderOrFollowerException => // Case A or Case B - debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately") - return forceComplete() - case _: UnknownTopicOrPartitionException => // Case C - debug(s"Broker no longer knows of partition $topicIdPartition, satisfy $this immediately") - return forceComplete() - case _: KafkaStorageException => // Case D - debug(s"Partition $topicIdPartition is in an offline log directory, satisfy $this immediately") - return forceComplete() - case _: FencedLeaderEpochException => // Case E - debug(s"Broker is the leader of partition $topicIdPartition, but the requested epoch " + - s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $this immediately") - return forceComplete() } + } catch { + case _: NotLeaderOrFollowerException => // Case A or Case B + debug(s"Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately") + return forceComplete() + case _: UnknownTopicOrPartitionException => // Case C + debug(s"Broker no longer knows of partition $topicIdPartition, satisfy $this immediately") + return forceComplete() + case _: KafkaStorageException => // Case D + debug(s"Partition $topicIdPartition is in an offline log directory, satisfy $this immediately") + return forceComplete() + case _: FencedLeaderEpochException => // Case E + debug(s"Broker is the leader of partition $topicIdPartition, but the requested epoch " + + s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $this immediately") + return forceComplete() + } } // Case G if (accumulatedSize >= params.minBytes) - forceComplete() + forceComplete() else false } @@ -154,9 +154,9 @@ class DelayedFetch( * Upon completion, read whatever data is available and pass to the complete callback */ override def onComplete(): Unit = { - val fetchInfos = fetchPartitionStatus.map { case (tp, status) => + val fetchInfos = fetchPartitionStatus.asScala.iterator.map { case (tp, status) => tp -> status.fetchInfo - } + }.toBuffer val logReadResults = replicaManager.readFromLog( params, diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a6d0cf26b03a4..1b3566ea3da85 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1628,7 +1628,7 @@ class ReplicaManager(val config: KafkaConfig, params: FetchParams, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, logReadResults: util.LinkedHashMap[TopicIdPartition, LogReadResult], - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { + fetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]): Unit = { val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]] val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]] @@ -1643,7 +1643,7 @@ class ReplicaManager(val config: KafkaConfig, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs, - fetchPartitionStatus.toMap.asJava, + fetchPartitionStatus, params, logReadResults, tp => getPartitionOrException(tp), @@ -1710,17 +1710,17 @@ class ReplicaManager(val config: KafkaConfig, responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results - val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)] + val fetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] fetchInfos.foreach { case (topicIdPartition, partitionData) => val logReadResult = logReadResultMap.get(topicIdPartition) if (logReadResult != null) { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicIdPartition -> new FetchPartitionStatus(logOffsetMetadata, partitionData)) + fetchPartitionStatus.put(topicIdPartition, new FetchPartitionStatus(logOffsetMetadata, partitionData)) } } if (!remoteFetchInfos.isEmpty) { - processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus.toSeq) + processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus) } else { // If there is not enough data to respond and there is no remote data, we will let the fetch request // wait for new data. @@ -1733,12 +1733,15 @@ class ReplicaManager(val config: KafkaConfig, ) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList + val delayedFetchKeys = fetchPartitionStatus.keySet() + .stream() + .map(new TopicPartitionOperationKey(_)) + .toList() // try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed fetch operation is being created, new requests // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys.asJava) + delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } } } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index fa3b8465d651f..5a3e5e7959b9b 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -34,6 +34,8 @@ import org.junit.jupiter.params.provider.ValueSource import org.mockito.ArgumentMatchers.{any, anyInt} import org.mockito.Mockito.{mock, when} +import java.util + class DelayedFetchTest { private val maxBytes = 1024 private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) @@ -59,7 +61,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -105,7 +107,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -145,7 +147,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -196,7 +198,7 @@ class DelayedFetchTest { val delayedFetch = new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, fetchStatus), replicaManager = replicaManager, quota = replicaQuota, responseCallback = callback @@ -267,4 +269,9 @@ class DelayedFetchTest { error) } + private def createFetchPartitionStatusMap(tpId: TopicIdPartition, status: FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = { + val statusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] + statusMap.put(tpId, status) + statusMap + } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 307afad4f5f45..93340bfb3b2ef 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -40,6 +40,7 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} import org.mockito.Mockito.{mock, when} import org.mockito.{AdditionalMatchers, ArgumentMatchers} +import java.util import scala.jdk.CollectionConverters._ class ReplicaManagerQuotasTest { @@ -186,7 +187,7 @@ class ReplicaManagerQuotasTest { new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(tp -> fetchPartitionStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(tp, fetchPartitionStatus), replicaManager = replicaManager, quota = null, responseCallback = null @@ -237,7 +238,7 @@ class ReplicaManagerQuotasTest { new DelayedFetch( params = fetchParams, - fetchPartitionStatus = Seq(tidp -> fetchPartitionStatus), + fetchPartitionStatus = createFetchPartitionStatusMap(tidp, fetchPartitionStatus), replicaManager = replicaManager, quota = null, responseCallback = null @@ -341,4 +342,9 @@ class ReplicaManagerQuotasTest { quota } + private def createFetchPartitionStatusMap(tpId: TopicIdPartition, status: FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] = { + val statusMap = new util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus] + statusMap.put(tpId, status) + statusMap + } }