-
Notifications
You must be signed in to change notification settings - Fork 14.8k
MINOR: Initialize fetchPartitionStatus as a Map type to reduce collection conversions #20768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
| val fetchInfos = fetchPartitionStatus.asScala.map { case (tp, status) => | ||
| tp -> status.fetchInfo | ||
| } | ||
| }.toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have duplicate TPs in this "fetch" path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we are doing two conversions here - you can avoid one by using a view or iterator after asScala and using toBuffer instead of toSeq (the latter can result in a lazy collection being created which can result in problems).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The source of fetchPartitionStatus comes from fetchData().
kafka/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
Lines 395 to 420 in 53e1172
| public Map<TopicIdPartition, PartitionData> fetchData(Map<Uuid, String> topicNames) { | |
| final LinkedHashMap<TopicIdPartition, PartitionData> fetchData = new LinkedHashMap<>(); | |
| final short version = version(); | |
| data.topics().forEach(fetchTopic -> { | |
| String name; | |
| if (version < 13) { | |
| name = fetchTopic.topic(); // can't be null | |
| } else { | |
| name = topicNames.get(fetchTopic.topicId()); | |
| } | |
| fetchTopic.partitions().forEach(fetchPartition -> | |
| // Topic name may be null here if the topic name was unable to be resolved using the topicNames map. | |
| fetchData.put(new TopicIdPartition(fetchTopic.topicId(), new TopicPartition(name, fetchPartition.partition())), | |
| new PartitionData( | |
| fetchTopic.topicId(), | |
| fetchPartition.fetchOffset(), | |
| fetchPartition.logStartOffset(), | |
| fetchPartition.partitionMaxBytes(), | |
| optionalEpoch(fetchPartition.currentLeaderEpoch()), | |
| optionalEpoch(fetchPartition.lastFetchedEpoch()) | |
| ) | |
| ) | |
| ); | |
| }); | |
| return fetchData; | |
| } |
Its type is
LinkedHashMap<TopicIdPartition, PartitionData>, so there will be no duplicate TPs.
|
|
||
| // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation | ||
| val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList | ||
| val delayedFetchKeys = fetchPartitionStatus.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we convert to a Scala List if we convert to a Java collection right after? We should probably use Java's Stream here and avoid the Scala collections altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! I've fixed it. PTAL when you get a chance.
core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@DL1231 please rebase code to include latest fixes |
| val delayedFetchKeys = fetchPartitionStatus.keySet() | ||
| .stream() | ||
| .map(new TopicPartitionOperationKey(_)) | ||
| .collect(Collectors.toList[TopicPartitionOperationKey]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use the Stream.toList() directly (no need for collect). This method was added to Stream in Java 16.
see #19876 (comment)
Initialize
fetchPartitionStatusas aMaptype to reduce unnecessarycollection conversions.