Skip to content

Conversation

@DL1231
Copy link
Contributor

@DL1231 DL1231 commented Oct 25, 2025

see #19876 (comment)
Initialize fetchPartitionStatus as a Map type to reduce unnecessary
collection conversions.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker small Small PRs labels Oct 25, 2025
val fetchInfos = fetchPartitionStatus.asScala.map { case (tp, status) =>
tp -> status.fetchInfo
}
}.toSeq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have duplicate TPs in this "fetch" path?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we are doing two conversions here - you can avoid one by using a view or iterator after asScala and using toBuffer instead of toSeq (the latter can result in a lazy collection being created which can result in problems).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source of fetchPartitionStatus comes from fetchData().

public Map<TopicIdPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) {
final LinkedHashMap<TopicIdPartition, PartitionData> fetchData = new LinkedHashMap<>();
final short version = version();
data.topics().forEach(fetchTopic -> {
String name;
if (version < 13) {
name = fetchTopic.topic(); // can't be null
} else {
name = topicNames.get(fetchTopic.topicId());
}
fetchTopic.partitions().forEach(fetchPartition ->
// Topic name may be null here if the topic name was unable to be resolved using the topicNames map.
fetchData.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())),
new PartitionData(
fetchTopic.topicId(),
fetchPartition.fetchOffset(),
fetchPartition.logStartOffset(),
fetchPartition.partitionMaxBytes(),
optionalEpoch(fetchPartition.currentLeaderEpoch()),
optionalEpoch(fetchPartition.lastFetchedEpoch())
)
)
);
});
return fetchData;
}

Its type is LinkedHashMap<TopicIdPartition, PartitionData>, so there will be no duplicate TPs.


// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
val delayedFetchKeys = fetchPartitionStatus.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we convert to a Scala List if we convert to a Java collection right after? We should probably use Java's Stream here and avoid the Scala collections altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! I've fixed it. PTAL when you get a chance.

@github-actions github-actions bot removed the triage PRs from the community label Oct 26, 2025
@github-actions github-actions bot removed the small Small PRs label Oct 31, 2025
Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712
Copy link
Member

@DL1231 please rebase code to include latest fixes

val delayedFetchKeys = fetchPartitionStatus.keySet()
.stream()
.map(new TopicPartitionOperationKey(_))
.collect(Collectors.toList[TopicPartitionOperationKey]())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use the Stream.toList() directly (no need for collect). This method was added to Stream in Java 16.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants