Skip to content

KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 #20170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: trunk
Choose a base branch
from

Conversation

jim0987795064
Copy link
Contributor

@jim0987795064 jim0987795064 commented Jul 15, 2025

RoundRobinPartitioner behaviour was broken by sticky partitioning
(KIP-480).

This patch addresses the behavioural issue caused by the second call to
partition() after onNewBatch(), in a predicatable and thread-safe
manner.

Unit tested by simulation of multiple threads producing to two topics
with race conditions.

Changes:

  • Store the last used partition per batch in a thread-safe queue to
    ensures thread-safety across multiple producer threads.
  • Adds unit tests that simulate multiple threads producing to two
    topics.

Reasons:

  • The original round-robin algorithm relies on a shared counter, which
    can result in multiple threads assigning the same partition for new
    batches. This breaks the intended even distribution and causes partition
    skew.

@github-actions github-actions bot added producer clients small Small PRs triage PRs from the community labels Jul 15, 2025
partitioner.onNewBatch(topicB, testCluster, 1);
assertEquals(7, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(8, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that this test verifies the next partition selected matches the enqueued value for each topic. Consider adding test cases for some edge cases, like empty queue and error handling

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @ash-at-github,
Thanks for this suggestion. I've added a test case for the empty queue scenario.
Let me know if you have more questions.

@github-actions github-actions bot removed the triage PRs from the community label Jul 19, 2025
@jim0987795064 jim0987795064 changed the title KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 (WIP)KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 Jul 20, 2025
@@ -45,4 +45,7 @@ public interface Partitioner extends Configurable, Closeable {
* This is called when partitioner is closed.
*/
void close();

default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please consider using "casting" to invoke RoundRobinPartitioner#onNewBatch if needs

@github-actions github-actions bot removed the small Small PRs label Aug 3, 2025
@jim0987795064 jim0987795064 changed the title (WIP)KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 Aug 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants