-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 #20170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bde18b8
f042910
ab1968b
bc93709
5863a74
6507cd0
42cab51
4e94751
d5ad01b
805d812
60d3c2f
3bd11a0
4679e2b
316d9a0
c73a3c9
9d577ef
20c7ab2
546d0cd
0abcc57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,9 +43,9 @@ public void testRoundRobinWithUnavailablePartitions() { | |
// Intentionally make the partition list not in partition order to test the edge | ||
// cases. | ||
List<PartitionInfo> partitions = asList( | ||
new PartitionInfo("test", 1, null, NODES, NODES), | ||
new PartitionInfo("test", 2, NODES[1], NODES, NODES), | ||
new PartitionInfo("test", 0, NODES[0], NODES, NODES)); | ||
new PartitionInfo("test", 1, null, NODES, NODES), | ||
new PartitionInfo("test", 2, NODES[1], NODES, NODES), | ||
new PartitionInfo("test", 0, NODES[0], NODES, NODES)); | ||
// When there are some unavailable partitions, we want to make sure that (1) we | ||
// always pick an available partition, | ||
// and (2) the available partitions are selected in a round robin way. | ||
|
@@ -70,11 +70,13 @@ public void testRoundRobinWithKeyBytes() { | |
final String topicA = "topicA"; | ||
final String topicB = "topicB"; | ||
|
||
List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES), | ||
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES)); | ||
List<PartitionInfo> allPartitions = asList( | ||
new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), | ||
new PartitionInfo(topicA, 2, NODES[2], NODES, NODES), | ||
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES)); | ||
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions, | ||
Collections.emptySet(), Collections.emptySet()); | ||
Collections.emptySet(), Collections.emptySet()); | ||
|
||
final Map<Integer, Integer> partitionCount = new HashMap<>(); | ||
|
||
|
@@ -96,4 +98,45 @@ public void testRoundRobinWithKeyBytes() { | |
assertEquals(10, partitionCount.get(1).intValue()); | ||
assertEquals(10, partitionCount.get(2).intValue()); | ||
} | ||
|
||
@Test | ||
public void testRoundRobinWithAbortOnNewBatch() throws Exception { | ||
final String topicA = "topicA"; | ||
final String topicB = "topicB"; | ||
|
||
List<PartitionInfo> allPartitions = asList( | ||
new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicA, 1, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicA, 2, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicA, 3, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicA, 4, NODES[0], NODES, NODES), | ||
new PartitionInfo(topicB, 0, NODES[1], NODES, NODES), | ||
new PartitionInfo(topicB, 1, NODES[1], NODES, NODES)); | ||
|
||
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1]), allPartitions, | ||
Collections.<String>emptySet(), Collections.<String>emptySet()); | ||
|
||
RoundRobinPartitioner partitioner = new RoundRobinPartitioner(); | ||
|
||
// Verify that partition selection still works correctly when queue is empty. | ||
assertEquals(0, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(1, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster)); | ||
|
||
// Abort for new batch - previous partition should be returned on subsequent call | ||
// Simulate three threads producing to two topics, with race condition in producer | ||
partitioner.onNewBatch(topicA, testCluster, 0); | ||
partitioner.onNewBatch(topicA, testCluster, 1); | ||
partitioner.onNewBatch(topicB, testCluster, 0); | ||
assertEquals(0, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(1, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster)); | ||
|
||
// Verify that partition selection still works correctly after call to onNewBatch. | ||
assertEquals(2, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(3, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(4, partitioner.partition(topicA, null, null, null, null, testCluster)); | ||
assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's great that this test verifies the next partition selected matches the enqueued value for each topic. Consider adding test cases for some edge cases, like empty queue and error handling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hello @ash-at-github, |
||
assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, onNewBatch() is already removed in trunk and Partitioner.partition() should be called only once for each record in the producer. So, the current RoundRobinPartitioner should work in trunk, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. We should fix it in the 3.9 branch only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 3.9, should we just cherry pick #17620?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is an interesting story
#17620 was not backported to 3.9, and the fix in 4.0 was removed along with the deprecated method. In other words, the fix never made it into any release ...
I will backport #17620 to 3.9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4.0 shouldn't have this issue since https://github.com/apache/kafka/pull/18282/files removed the extra partition() call, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the bug no longer exists in 4.x.
I just find it interesting that the final fix in 4.0 was simply to remove the extra partition