Skip to content

KAFKA-9965/KAFKA-13303: RoundRobinPartitioner broken by KIP-480 #20170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
bde18b8
test
jim0987795064 May 2, 2025
f042910
Merge branch 'apache:trunk' into trunk
jim0987795064 May 5, 2025
ab1968b
Merge branch 'apache:trunk' into trunk
jim0987795064 May 20, 2025
bc93709
Merge branch 'apache:trunk' into trunk
jim0987795064 May 20, 2025
5863a74
Merge branch 'apache:trunk' into trunk
jim0987795064 May 22, 2025
6507cd0
Merge branch 'apache:trunk' into trunk
jim0987795064 May 27, 2025
42cab51
Sync KafkaAdminClient.java from upstream/trunk
jim0987795064 May 27, 2025
4e94751
Merge branch 'trunk' of https://github.com/jim0987795064/kafka into t…
jim0987795064 May 27, 2025
d5ad01b
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
jim0987795064 May 28, 2025
805d812
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
jim0987795064 May 29, 2025
60d3c2f
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
jim0987795064 Jun 7, 2025
3bd11a0
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
jim0987795064 Jun 21, 2025
4679e2b
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
jim0987795064 Jul 5, 2025
316d9a0
Merge remote-tracking branch 'upstream/trunk' into trunk
jim0987795064 Jul 11, 2025
c73a3c9
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
jim0987795064 Jul 15, 2025
9d577ef
fix(producer): preserve partition order in RoundRobinPartitioner acro…
jim0987795064 Jul 15, 2025
20c7ab2
Add the logic of abortOnNewBatch to KafkaProducer and add edge case t…
jim0987795064 Aug 3, 2025
546d0cd
refactor: Remove redundant code of RoundRobinPartitionerTest
jim0987795064 Aug 3, 2025
0abcc57
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA-99…
jim0987795064 Aug 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1124,12 +1124,30 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

// A custom RoundRobinPartitioner may take advantage on the onNewBatch callback.
boolean abortOnNewBatch = false;
if (partitionerPlugin.get() instanceof RoundRobinPartitioner) {
abortOnNewBatch = true;
}

// Append the record to the accumulator. Note, that the actual partition may be
// calculated there and can be accessed via appendCallbacks.topicPartition.
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster);
serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster, abortOnNewBatch);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

// Notify the RoundRobinPartitioner that the previous batch is full, and request it to return prevPartition to the idle queue.
if (result.abortOnNewBatch) {
int prevPartition = partition;
((RoundRobinPartitioner) partitionerPlugin.get()).onNewBatch(record.topic(), cluster, prevPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, onNewBatch() is already removed in trunk and Partitioner.partition() should be called only once for each record in the producer. So, the current RoundRobinPartitioner should work in trunk, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. We should fix it in the 3.9 branch only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 3.9, should we just cherry pick #17620?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an interesting story

#17620 was not backported to 3.9, and the fix in 4.0 was removed along with the deprecated method. In other words, the fix never made it into any release ...

I will backport #17620 to 3.9

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.0 shouldn't have this issue since https://github.com/apache/kafka/pull/18282/files removed the extra partition() call, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the bug no longer exists in 4.x.

I just find it interesting that the final fix in 4.0 was simply to remove the extra partition

partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster, false);
}

// Add the partition to the transaction (if in progress) after it has been successfully
// appended to the accumulator. We cannot do it before because the partition may be
// unknown. Note that the `Sender` will refuse to dequeue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -33,9 +38,15 @@
* to distribute the writes to all partitions equally. This
* is the behaviour regardless of record key hash.
*
* The "Round-Robin" partitioner - MODIFIED TO WORK PROPERLY WITH STICKY PARTITIONING (KIP-480)
* <p>
* This partitioning strategy can be used when user wants to distribute the writes to all
* partitions equally. This is the behaviour regardless of record key hash.
*/
public class RoundRobinPartitioner implements Partitioner {
private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinPartitioner.class);
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Queue<Integer>> topicPartitionQueueMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {}

Expand All @@ -51,15 +62,25 @@ public void configure(Map<String, ?> configs) {}
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);
Integer queuedPartition = partitionQueue.poll();
if (queuedPartition != null) {
LOGGER.trace("Partition chosen from queue: {}", queuedPartition);
return queuedPartition;
} else {
// no partitions are available, give a non-available partition
int numPartitions = cluster.partitionsForTopic(topic).size();
return Utils.toPositive(nextValue) % numPartitions;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
int partition = availablePartitions.get(part).partition();
LOGGER.trace("Partition chosen: {}", partition);
return partition;
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
}

Expand All @@ -68,5 +89,25 @@ private int nextValue(String topic) {
return counter.getAndIncrement();
}

private Queue<Integer> partitionQueueComputeIfAbsent(String topic) {
return topicPartitionQueueMap.computeIfAbsent(topic, k -> new ConcurrentLinkedQueue<>());
}

public void close() {}

/**
* Notifies the partitioner that a new batch is about to be created.
* When using the RoundRobinPartitioner,
* this method helps preserve partition order across batches in multi-threaded scenarios.
*
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new
* batch
*/
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
LOGGER.trace("New batch so enqueuing partition {} for topic {}", prevPartition, topic);
Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);
partitionQueue.add(prevPartition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ public RecordAppendResult append(String topic,
AppendCallbacks callbacks,
long maxTimeToBlock,
long nowMs,
Cluster cluster) throws InterruptedException {
Cluster cluster,
boolean abortOnNewBatch) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));

// We keep track of the number of appending thread to make sure we do not miss batches in
Expand Down Expand Up @@ -325,6 +326,12 @@ public RecordAppendResult append(String topic,
}
}

// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, 0, true);
}

if (buffer == null) {
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
Expand Down Expand Up @@ -398,7 +405,7 @@ private RecordAppendResult appendNewBatch(String topic,
dq.addLast(batch);
incomplete.add(batch);

return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes());
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes(), false);
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
Expand Down Expand Up @@ -434,7 +441,7 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, H
last.closeForRecordAppends();
} else {
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes);
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes, false);
}
}
return null;
Expand Down Expand Up @@ -1197,15 +1204,19 @@ public static final class RecordAppendResult {
public final boolean batchIsFull;
public final boolean newBatchCreated;
public final int appendedBytes;
public final boolean abortOnNewBatch;


public RecordAppendResult(FutureRecordMetadata future,
boolean batchIsFull,
boolean newBatchCreated,
int appendedBytes) {
int appendedBytes,
boolean abortOnNewBatch) {
this.future = future;
this.batchIsFull = batchIsFull;
this.newBatchCreated = newBatchCreated;
this.appendedBytes = appendedBytes;
this.abortOnNewBatch = abortOnNewBatch;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,8 @@ public void testSendNotAllowedInPreparedTransactionState() throws Exception {
any(),
anyLong(),
anyLong(),
any()
any(),
eq(false)
);
}
}
Expand Down Expand Up @@ -2770,7 +2771,8 @@ private <T> FutureRecordMetadata expectAppend(
any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(),
anyLong(),
any()
any(),
eq(false)
)).thenAnswer(invocation -> {
RecordAccumulator.AppendCallbacks callbacks =
(RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
Expand All @@ -2779,7 +2781,8 @@ private <T> FutureRecordMetadata expectAppend(
futureRecordMetadata,
false,
false,
0);
0,
false);
});

return futureRecordMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public void testRoundRobinWithUnavailablePartitions() {
// Intentionally make the partition list not in partition order to test the edge
// cases.
List<PartitionInfo> partitions = asList(
new PartitionInfo("test", 1, null, NODES, NODES),
new PartitionInfo("test", 2, NODES[1], NODES, NODES),
new PartitionInfo("test", 0, NODES[0], NODES, NODES));
new PartitionInfo("test", 1, null, NODES, NODES),
new PartitionInfo("test", 2, NODES[1], NODES, NODES),
new PartitionInfo("test", 0, NODES[0], NODES, NODES));
// When there are some unavailable partitions, we want to make sure that (1) we
// always pick an available partition,
// and (2) the available partitions are selected in a round robin way.
Expand All @@ -70,11 +70,13 @@ public void testRoundRobinWithKeyBytes() {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
List<PartitionInfo> allPartitions = asList(
new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES),
new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());
Collections.emptySet(), Collections.emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Expand All @@ -96,4 +98,45 @@ public void testRoundRobinWithKeyBytes() {
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}

@Test
public void testRoundRobinWithAbortOnNewBatch() throws Exception {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(
new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 2, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 3, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 4, NODES[0], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[1], NODES, NODES),
new PartitionInfo(topicB, 1, NODES[1], NODES, NODES));

Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1]), allPartitions,
Collections.<String>emptySet(), Collections.<String>emptySet());

RoundRobinPartitioner partitioner = new RoundRobinPartitioner();

// Verify that partition selection still works correctly when queue is empty.
assertEquals(0, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(1, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster));

// Abort for new batch - previous partition should be returned on subsequent call
// Simulate three threads producing to two topics, with race condition in producer
partitioner.onNewBatch(topicA, testCluster, 0);
partitioner.onNewBatch(topicA, testCluster, 1);
partitioner.onNewBatch(topicB, testCluster, 0);
assertEquals(0, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(1, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster));

// Verify that partition selection still works correctly after call to onNewBatch.
assertEquals(2, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(3, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(4, partitioner.partition(topicA, null, null, null, null, testCluster));
assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that this test verifies the next partition selected matches the enqueued value for each topic. Consider adding test cases for some edge cases, like empty queue and error handling

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @ash-at-github,
Thanks for this suggestion. I've added a test case for the empty queue scenario.
Let me know if you have more questions.

assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster));
}
}
Loading