@@ -30,6 +30,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
3030import org .apache .kafka .server .storage .log .{FetchIsolation , FetchParams , FetchPartitionData }
3131import org .apache .kafka .storage .internals .log .{FetchPartitionStatus , LogOffsetMetadata }
3232
33+ import java .util
3334import scala .collection ._
3435import scala .jdk .CollectionConverters ._
3536
@@ -39,7 +40,7 @@ import scala.jdk.CollectionConverters._
3940 */
4041class DelayedFetch (
4142 params : FetchParams ,
42- fetchPartitionStatus : Seq [( TopicIdPartition , FetchPartitionStatus ) ],
43+ fetchPartitionStatus : util. LinkedHashMap [ TopicIdPartition , FetchPartitionStatus ],
4344 replicaManager : ReplicaManager ,
4445 quota : ReplicaQuota ,
4546 responseCallback : Seq [(TopicIdPartition , FetchPartitionData )] => Unit
@@ -66,79 +67,78 @@ class DelayedFetch(
6667 */
6768 override def tryComplete (): Boolean = {
6869 var accumulatedSize = 0
69- fetchPartitionStatus.foreach {
70- case (topicIdPartition, fetchStatus) =>
71- val fetchOffset = fetchStatus.startOffsetMetadata
72- val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
73- try {
74- if (fetchOffset != LogOffsetMetadata .UNKNOWN_OFFSET_METADATA ) {
75- val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
76- val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
77-
78- val endOffset = params.isolation match {
79- case FetchIsolation .LOG_END => offsetSnapshot.logEndOffset
80- case FetchIsolation .HIGH_WATERMARK => offsetSnapshot.highWatermark
81- case FetchIsolation .TXN_COMMITTED => offsetSnapshot.lastStableOffset
82- }
70+ fetchPartitionStatus.forEach { (topicIdPartition, fetchStatus) =>
71+ val fetchOffset = fetchStatus.startOffsetMetadata
72+ val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
73+ try {
74+ if (fetchOffset != LogOffsetMetadata .UNKNOWN_OFFSET_METADATA ) {
75+ val partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
76+ val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
77+
78+ val endOffset = params.isolation match {
79+ case FetchIsolation .LOG_END => offsetSnapshot.logEndOffset
80+ case FetchIsolation .HIGH_WATERMARK => offsetSnapshot.highWatermark
81+ case FetchIsolation .TXN_COMMITTED => offsetSnapshot.lastStableOffset
82+ }
8383
84- // Go directly to the check for Case G if the message offsets are the same. If the log segment
85- // has just rolled, then the high watermark offset will remain the same but be on the old segment,
86- // which would incorrectly be seen as an instance of Case F.
87- if (fetchOffset.messageOffset > endOffset.messageOffset) {
88- // Case F, this can happen when the new fetch operation is on a truncated leader
89- debug(s " Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition. " )
90- return forceComplete()
91- } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
92- if (fetchOffset.onOlderSegment(endOffset)) {
93- // Case F, this can happen when the fetch operation is falling behind the current segment
94- // or the partition has just rolled a new segment
95- debug(s " Satisfying fetch $this immediately since it is fetching older segments. " )
96- // We will not force complete the fetch request if a replica should be throttled.
97- if (! params.isFromFollower || ! replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
98- return forceComplete()
99- } else if (fetchOffset.onSameSegment(endOffset)) {
100- // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
101- val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
102- if (! params.isFromFollower || ! replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
103- accumulatedSize += bytesAvailable
104- }
84+ // Go directly to the check for Case G if the message offsets are the same. If the log segment
85+ // has just rolled, then the high watermark offset will remain the same but be on the old segment,
86+ // which would incorrectly be seen as an instance of Case F.
87+ if (fetchOffset.messageOffset > endOffset.messageOffset) {
88+ // Case F, this can happen when the new fetch operation is on a truncated leader
89+ debug(s " Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition. " )
90+ return forceComplete()
91+ } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
92+ if (fetchOffset.onOlderSegment(endOffset)) {
93+ // Case F, this can happen when the fetch operation is falling behind the current segment
94+ // or the partition has just rolled a new segment
95+ debug(s " Satisfying fetch $this immediately since it is fetching older segments. " )
96+ // We will not force complete the fetch request if a replica should be throttled.
97+ if (! params.isFromFollower || ! replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
98+ return forceComplete()
99+ } else if (fetchOffset.onSameSegment(endOffset)) {
100+ // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
101+ val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
102+ if (! params.isFromFollower || ! replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
103+ accumulatedSize += bytesAvailable
105104 }
105+ }
106106
107- // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
108- fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
109- val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false )
110- if (epochEndOffset.errorCode != Errors .NONE .code()
111- || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
112- || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH ) {
113- debug(s " Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset= $epochEndOffset. " )
114- return forceComplete()
115- } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
116- debug(s " Satisfying fetch $this since it has diverging epoch requiring truncation for partition " +
117- s " $topicIdPartition epochEndOffset= $epochEndOffset fetchEpoch= $fetchEpoch fetchOffset= ${fetchStatus.fetchInfo.fetchOffset}. " )
118- return forceComplete()
119- }
107+ // Case H: If truncation has caused diverging epoch while this request was in purgatory, return to trigger truncation
108+ fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
109+ val epochEndOffset = partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, fetchOnlyFromLeader = false )
110+ if (epochEndOffset.errorCode != Errors .NONE .code()
111+ || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
112+ || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH ) {
113+ debug(s " Could not obtain last offset for leader epoch for partition $topicIdPartition, epochEndOffset= $epochEndOffset. " )
114+ return forceComplete()
115+ } else if (epochEndOffset.leaderEpoch < fetchEpoch || epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
116+ debug(s " Satisfying fetch $this since it has diverging epoch requiring truncation for partition " +
117+ s " $topicIdPartition epochEndOffset= $epochEndOffset fetchEpoch= $fetchEpoch fetchOffset= ${fetchStatus.fetchInfo.fetchOffset}. " )
118+ return forceComplete()
120119 }
121120 }
122- } catch {
123- case _ : NotLeaderOrFollowerException => // Case A or Case B
124- debug(s " Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately " )
125- return forceComplete()
126- case _ : UnknownTopicOrPartitionException => // Case C
127- debug(s " Broker no longer knows of partition $topicIdPartition, satisfy $this immediately " )
128- return forceComplete()
129- case _ : KafkaStorageException => // Case D
130- debug(s " Partition $topicIdPartition is in an offline log directory, satisfy $this immediately " )
131- return forceComplete()
132- case _ : FencedLeaderEpochException => // Case E
133- debug(s " Broker is the leader of partition $topicIdPartition, but the requested epoch " +
134- s " $fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $this immediately " )
135- return forceComplete()
136121 }
122+ } catch {
123+ case _ : NotLeaderOrFollowerException => // Case A or Case B
124+ debug(s " Broker is no longer the leader or follower of $topicIdPartition, satisfy $this immediately " )
125+ return forceComplete()
126+ case _ : UnknownTopicOrPartitionException => // Case C
127+ debug(s " Broker no longer knows of partition $topicIdPartition, satisfy $this immediately " )
128+ return forceComplete()
129+ case _ : KafkaStorageException => // Case D
130+ debug(s " Partition $topicIdPartition is in an offline log directory, satisfy $this immediately " )
131+ return forceComplete()
132+ case _ : FencedLeaderEpochException => // Case E
133+ debug(s " Broker is the leader of partition $topicIdPartition, but the requested epoch " +
134+ s " $fetchLeaderEpoch is fenced by the latest leader epoch, satisfy $this immediately " )
135+ return forceComplete()
136+ }
137137 }
138138
139139 // Case G
140140 if (accumulatedSize >= params.minBytes)
141- forceComplete()
141+ forceComplete()
142142 else
143143 false
144144 }
@@ -154,9 +154,9 @@ class DelayedFetch(
154154 * Upon completion, read whatever data is available and pass to the complete callback
155155 */
156156 override def onComplete (): Unit = {
157- val fetchInfos = fetchPartitionStatus.map { case (tp, status) =>
157+ val fetchInfos = fetchPartitionStatus.asScala.iterator. map { case (tp, status) =>
158158 tp -> status.fetchInfo
159- }
159+ }.toBuffer
160160
161161 val logReadResults = replicaManager.readFromLog(
162162 params,
0 commit comments