diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c4ace64b0e52b..28467689fa555 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1124,12 +1124,30 @@ private Future doSend(ProducerRecord record, Callback call ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); + // A custom RoundRobinPartitioner may take advantage on the onNewBatch callback. + boolean abortOnNewBatch = false; + if (partitionerPlugin.get() instanceof RoundRobinPartitioner) { + abortOnNewBatch = true; + } + // Append the record to the accumulator. Note, that the actual partition may be // calculated there and can be accessed via appendCallbacks.topicPartition. RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey, - serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster); + serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster, abortOnNewBatch); assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION; + // Notify the RoundRobinPartitioner that the previous batch is full, and request it to return prevPartition to the idle queue. + if (result.abortOnNewBatch) { + int prevPartition = partition; + ((RoundRobinPartitioner) partitionerPlugin.get()).onNewBatch(record.topic(), cluster, prevPartition); + partition = partition(record, serializedKey, serializedValue, cluster); + if (log.isTraceEnabled()) { + log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); + } + result = accumulator.append(record.topic(), partition, timestamp, serializedKey, + serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster, false); + } + // Add the partition to the transaction (if in progress) after it has been successfully // appended to the accumulator. We cannot do it before because the partition may be // unknown. Note that the `Sender` will refuse to dequeue diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java index c736756ab4231..fe57afec58a94 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java @@ -20,9 +20,14 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -33,9 +38,15 @@ * to distribute the writes to all partitions equally. This * is the behaviour regardless of record key hash. * + * The "Round-Robin" partitioner - MODIFIED TO WORK PROPERLY WITH STICKY PARTITIONING (KIP-480) + *

+ * This partitioning strategy can be used when user wants to distribute the writes to all + * partitions equally. This is the behaviour regardless of record key hash. */ public class RoundRobinPartitioner implements Partitioner { + private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinPartitioner.class); private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> topicPartitionQueueMap = new ConcurrentHashMap<>(); public void configure(Map configs) {} @@ -51,15 +62,25 @@ public void configure(Map configs) {} */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - int nextValue = nextValue(topic); - List availablePartitions = cluster.availablePartitionsForTopic(topic); - if (!availablePartitions.isEmpty()) { - int part = Utils.toPositive(nextValue) % availablePartitions.size(); - return availablePartitions.get(part).partition(); + Queue partitionQueue = partitionQueueComputeIfAbsent(topic); + Integer queuedPartition = partitionQueue.poll(); + if (queuedPartition != null) { + LOGGER.trace("Partition chosen from queue: {}", queuedPartition); + return queuedPartition; } else { - // no partitions are available, give a non-available partition - int numPartitions = cluster.partitionsForTopic(topic).size(); - return Utils.toPositive(nextValue) % numPartitions; + List partitions = cluster.partitionsForTopic(topic); + int numPartitions = partitions.size(); + int nextValue = nextValue(topic); + List availablePartitions = cluster.availablePartitionsForTopic(topic); + if (!availablePartitions.isEmpty()) { + int part = Utils.toPositive(nextValue) % availablePartitions.size(); + int partition = availablePartitions.get(part).partition(); + LOGGER.trace("Partition chosen: {}", partition); + return partition; + } else { + // no partitions are available, give a non-available partition + return Utils.toPositive(nextValue) % numPartitions; + } } } @@ -68,5 +89,25 @@ private int nextValue(String topic) { return counter.getAndIncrement(); } + private Queue partitionQueueComputeIfAbsent(String topic) { + return topicPartitionQueueMap.computeIfAbsent(topic, k -> new ConcurrentLinkedQueue<>()); + } + public void close() {} + + /** + * Notifies the partitioner that a new batch is about to be created. + * When using the RoundRobinPartitioner, + * this method helps preserve partition order across batches in multi-threaded scenarios. + * + * @param topic The topic name + * @param cluster The current cluster metadata + * @param prevPartition The partition previously selected for the record that triggered a new + * batch + */ + public void onNewBatch(String topic, Cluster cluster, int prevPartition) { + LOGGER.trace("New batch so enqueuing partition {} for topic {}", prevPartition, topic); + Queue partitionQueue = partitionQueueComputeIfAbsent(topic); + partitionQueue.add(prevPartition); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 35cda5e51634b..bca0b40f42e17 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -281,7 +281,8 @@ public RecordAppendResult append(String topic, AppendCallbacks callbacks, long maxTimeToBlock, long nowMs, - Cluster cluster) throws InterruptedException { + Cluster cluster, + boolean abortOnNewBatch) throws InterruptedException { TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize))); // We keep track of the number of appending thread to make sure we do not miss batches in @@ -325,6 +326,12 @@ public RecordAppendResult append(String topic, } } + // we don't have an in-progress record batch try to allocate a new batch + if (abortOnNewBatch) { + // Return a result that will cause another call to append. + return new RecordAppendResult(null, false, false, 0, true); + } + if (buffer == null) { int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound( RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers)); @@ -398,7 +405,7 @@ private RecordAppendResult appendNewBatch(String topic, dq.addLast(batch); incomplete.add(batch); - return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes()); + return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes(), false); } private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) { @@ -434,7 +441,7 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, H last.closeForRecordAppends(); } else { int appendedBytes = last.estimatedSizeInBytes() - initialBytes; - return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes); + return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes, false); } } return null; @@ -1197,15 +1204,19 @@ public static final class RecordAppendResult { public final boolean batchIsFull; public final boolean newBatchCreated; public final int appendedBytes; + public final boolean abortOnNewBatch; + public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated, - int appendedBytes) { + int appendedBytes, + boolean abortOnNewBatch) { this.future = future; this.batchIsFull = batchIsFull; this.newBatchCreated = newBatchCreated; this.appendedBytes = appendedBytes; + this.abortOnNewBatch = abortOnNewBatch; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index a52d3c6f0b202..99ddbd1945c72 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1512,7 +1512,8 @@ public void testSendNotAllowedInPreparedTransactionState() throws Exception { any(), anyLong(), anyLong(), - any() + any(), + eq(false) ); } } @@ -2770,7 +2771,8 @@ private FutureRecordMetadata expectAppend( any(RecordAccumulator.AppendCallbacks.class), // 6 <-- anyLong(), anyLong(), - any() + any(), + eq(false) )).thenAnswer(invocation -> { RecordAccumulator.AppendCallbacks callbacks = (RecordAccumulator.AppendCallbacks) invocation.getArguments()[6]; @@ -2779,7 +2781,8 @@ private FutureRecordMetadata expectAppend( futureRecordMetadata, false, false, - 0); + 0, + false); }); return futureRecordMetadata; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java index 8c6d0a33d21b3..ec3f16fe3e90c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RoundRobinPartitionerTest.java @@ -43,9 +43,9 @@ public void testRoundRobinWithUnavailablePartitions() { // Intentionally make the partition list not in partition order to test the edge // cases. List partitions = asList( - new PartitionInfo("test", 1, null, NODES, NODES), - new PartitionInfo("test", 2, NODES[1], NODES, NODES), - new PartitionInfo("test", 0, NODES[0], NODES, NODES)); + new PartitionInfo("test", 1, null, NODES, NODES), + new PartitionInfo("test", 2, NODES[1], NODES, NODES), + new PartitionInfo("test", 0, NODES[0], NODES, NODES)); // When there are some unavailable partitions, we want to make sure that (1) we // always pick an available partition, // and (2) the available partitions are selected in a round robin way. @@ -70,11 +70,13 @@ public void testRoundRobinWithKeyBytes() { final String topicA = "topicA"; final String topicB = "topicB"; - List allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), - new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES), - new PartitionInfo(topicB, 0, NODES[0], NODES, NODES)); + List allPartitions = asList( + new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), + new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), + new PartitionInfo(topicA, 2, NODES[2], NODES, NODES), + new PartitionInfo(topicB, 0, NODES[0], NODES, NODES)); Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions, - Collections.emptySet(), Collections.emptySet()); + Collections.emptySet(), Collections.emptySet()); final Map partitionCount = new HashMap<>(); @@ -96,4 +98,45 @@ public void testRoundRobinWithKeyBytes() { assertEquals(10, partitionCount.get(1).intValue()); assertEquals(10, partitionCount.get(2).intValue()); } + + @Test + public void testRoundRobinWithAbortOnNewBatch() throws Exception { + final String topicA = "topicA"; + final String topicB = "topicB"; + + List allPartitions = asList( + new PartitionInfo(topicA, 0, NODES[0], NODES, NODES), + new PartitionInfo(topicA, 1, NODES[0], NODES, NODES), + new PartitionInfo(topicA, 2, NODES[0], NODES, NODES), + new PartitionInfo(topicA, 3, NODES[0], NODES, NODES), + new PartitionInfo(topicA, 4, NODES[0], NODES, NODES), + new PartitionInfo(topicB, 0, NODES[1], NODES, NODES), + new PartitionInfo(topicB, 1, NODES[1], NODES, NODES)); + + Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1]), allPartitions, + Collections.emptySet(), Collections.emptySet()); + + RoundRobinPartitioner partitioner = new RoundRobinPartitioner(); + + // Verify that partition selection still works correctly when queue is empty. + assertEquals(0, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(1, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster)); + + // Abort for new batch - previous partition should be returned on subsequent call + // Simulate three threads producing to two topics, with race condition in producer + partitioner.onNewBatch(topicA, testCluster, 0); + partitioner.onNewBatch(topicA, testCluster, 1); + partitioner.onNewBatch(topicB, testCluster, 0); + assertEquals(0, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(1, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster)); + + // Verify that partition selection still works correctly after call to onNewBatch. + assertEquals(2, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(3, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(4, partitioner.partition(topicA, null, null, null, null, testCluster)); + assertEquals(1, partitioner.partition(topicB, null, null, null, null, testCluster)); + assertEquals(0, partitioner.partition(topicB, null, null, null, null, testCluster)); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index ce01460e6edb0..03dcf6bca94c2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -160,18 +160,18 @@ public void testDrainBatches() throws Exception { // initial data - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained Map> batches1 = accum.drain(metadataCache, Set.of(node1, node2), (int) batchSize, 0); verifyTopicPartitionInBatches(batches1, tp1, tp3); // add record for tp1, tp3 - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 @@ -183,18 +183,18 @@ public void testDrainBatches() throws Exception { verifyTopicPartitionInBatches(batches3, tp1, tp3); // add record for tp2, tp3, tp4 and mute the tp4 - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); accum.mutePartition(tp4); // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) Map> batches4 = accum.drain(metadataCache, Set.of(node1, node2), (int) batchSize, 0); verifyTopicPartitionInBatches(batches4, tp2, tp3); // add record for tp1, tp2, tp3, and unmute tp4 - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); - accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); accum.unmutePartition(tp4); // set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4] Map> batches5 = accum.drain(metadataCache, Set.of(node1, node2), Integer.MAX_VALUE, 0); @@ -228,7 +228,7 @@ public void testFull() throws Exception { int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster(), false); Deque partitionBatches = accum.getDeque(tp1); assertEquals(1, partitionBatches.size()); @@ -239,7 +239,7 @@ public void testFull() throws Exception { // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster(), false); Deque partitionBatches = accum.getDeque(tp1); assertEquals(2, partitionBatches.size()); Iterator partitionBatchesIterator = partitionBatches.iterator(); @@ -274,7 +274,7 @@ private void testAppendLarge(Compression compression) throws Exception { byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compression, 0); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster(), false); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); Deque batches = accum.getDeque(tp1); @@ -312,7 +312,7 @@ private void testAppendLargeOldMessageFormat(Compression compression) throws Exc RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compression, 0); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster()); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster(), false); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); Deque batches = accum.getDeque(tp1); @@ -336,7 +336,7 @@ public void testLinger() throws Exception { int lingerMs = 10; RecordAccumulator accum = createTestRecordAccumulator( 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, Compression.NONE, lingerMs); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready"); time.sleep(10); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); @@ -359,7 +359,7 @@ public void testPartialDrain() throws Exception { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); } assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Partition's leader should be ready"); @@ -380,7 +380,7 @@ public void testStressfulSituation() throws Exception { threads.add(new Thread(() -> { for (int j = 0; j < msgs; j++) { try { - accum.append(topic, j % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, j % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); } catch (Exception e) { e.printStackTrace(); } @@ -423,7 +423,7 @@ public void testNextReadyCheckDelay() throws Exception { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time should be the linger time"); @@ -432,14 +432,14 @@ public void testNextReadyCheckDelay() throws Exception { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); result = accum.ready(metadataCache, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time"); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); result = accum.ready(metadataCache, time.milliseconds()); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -463,7 +463,7 @@ public void testRetryBackoff() throws Exception { new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); long now = time.milliseconds(); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now + lingerMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); Map> batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -475,7 +475,7 @@ public void testRetryBackoff() throws Exception { accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); result = accum.ready(metadataCache, now + lingerMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); @@ -529,7 +529,7 @@ public void testExponentialRetryBackoff() throws Exception { long now = time.milliseconds(); long initial = now; - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); // No backoff for initial attempt Map> batches = drainAndCheckBatchAmount(metadataCache, node1, accum, now + lingerMs + 1, 1); @@ -590,7 +590,7 @@ public void testExponentialRetryBackoffLeaderChange() throws Exception { long now = time.milliseconds(); long initial = now; - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); // No backoff for initial attempt Map> batches = drainAndCheckBatchAmount(metadataCache, node1, accum, now + lingerMs + 1, 1); @@ -647,7 +647,7 @@ public void testFlush() throws Exception { 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, Compression.NONE, lingerMs); for (int i = 0; i < 100; i++) { - accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); assertTrue(accum.hasIncomplete()); } RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); @@ -683,7 +683,7 @@ private void delayedInterrupt(final Thread thread, final long delayMs) { public void testAwaitFlushComplete() throws Exception { RecordAccumulator accum = createTestRecordAccumulator( 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, Compression.NONE, Integer.MAX_VALUE); - accum.append(topic, 0, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, 0, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); accum.beginFlush(); assertTrue(accum.flushInProgress()); @@ -716,7 +716,7 @@ public void setPartition(int partition) { } } for (int i = 0; i < numRecords; i++) - accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, time.milliseconds(), cluster, false); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); @@ -761,7 +761,7 @@ public void setPartition(int partition) { } } for (int i = 0; i < numRecords; i++) - accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, time.milliseconds(), cluster, false); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, @@ -800,7 +800,7 @@ private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedExcept for (Boolean mute: muteStates) { if (time.milliseconds() < System.currentTimeMillis()) time.setCurrentTimeMs(System.currentTimeMillis()); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partition should be ready."); time.sleep(lingerMs); @@ -849,11 +849,11 @@ public void testExpiredBatches() throws InterruptedException { // Test batches not in retry for (int i = 0; i < appends; i++) { - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); } // Make the batches ready due to batch full - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster, false); Set readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); // Advance the clock to expire the batch. @@ -883,7 +883,7 @@ public void testExpiredBatches() throws InterruptedException { // Test batches in retry. // Create a retried batch - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster, false); time.sleep(lingerMs); readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); @@ -907,7 +907,7 @@ public void testExpiredBatches() throws InterruptedException { assertEquals(0, expiredBatches.size(), "All batches should have been expired."); // Test that when being throttled muted batches are expired before the throttle time is over. - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster, false); time.sleep(lingerMs); readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); @@ -940,7 +940,7 @@ public void testMutedPartitions() throws InterruptedException { batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, Compression.NONE, 10); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(0, accum.ready(metadataCache, now).readyNodes.size(), "No partitions should be ready."); } time.sleep(2000); @@ -987,9 +987,9 @@ public void testRecordsDrainedWhenTransactionCompleting() throws Exception { Mockito.when(transactionManager.isCompleting()).thenReturn(false); accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, - time.milliseconds(), cluster); + time.milliseconds(), cluster, false); accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, - time.milliseconds(), cluster); + time.milliseconds(), cluster, false); assertTrue(accumulator.hasUndrained()); RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(metadataCache, time.milliseconds()); @@ -1104,7 +1104,7 @@ public void testSplitFrequency() throws InterruptedException { int dice = random.nextInt(100); byte[] value = (dice < goodCompRatioPercentage) ? bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100); - accum.append(topic, partition1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster, false); BatchDrainedResult result = completeOrSplitBatches(accum, batchSize); numSplit += result.numSplit; numBatches += result.numBatches; @@ -1127,7 +1127,7 @@ public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedExce RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, Compression.NONE, lingerMs); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); Set readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; Map> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(drained.isEmpty()); @@ -1142,7 +1142,7 @@ public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedExce //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); // Queue another batch and advance clock such that batch expiry time is earlier than request timeout. - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster); + accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster, false); time.sleep(lingerMs * 4); // Now drain and check that accumulator picked up the drained batch because its expiry is soon. @@ -1167,7 +1167,7 @@ public void testExpiredBatchesRetry() throws InterruptedException { // Test batches in retry. for (Boolean mute : muteStates) { - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster, false); time.sleep(lingerMs); readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); @@ -1220,7 +1220,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { // Produce small record, we should switch to first partition. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, value, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(partition1, partition.get()); assertEquals(1, mockRandom.get()); @@ -1229,28 +1229,28 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { // because of incomplete batch. byte[] largeValue = new byte[batchSize]; accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(partition1, partition.get()); assertEquals(1, mockRandom.get()); // Produce large record, we should switch to next partition as we complete // previous batch and exceeded sticky limit. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(partition2, partition.get()); assertEquals(2, mockRandom.get()); // Produce large record, we should switch to next partition as we complete // previous batch and exceeded sticky limit. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(partition3, partition.get()); assertEquals(3, mockRandom.get()); // Produce large record, we should switch to next partition as we complete // previous batch and exceeded sticky limit. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(partition1, partition.get()); assertEquals(4, mockRandom.get()); } @@ -1282,7 +1282,7 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic, for (int c = queueSizes[i]; c-- > 0; ) { // Add large records to each partition, so that each record creates a batch. accum.append(topic, i, 0L, null, largeValue, Record.EMPTY_HEADERS, - null, maxBlockTimeMs, time.milliseconds(), cluster); + null, maxBlockTimeMs, time.milliseconds(), cluster, false); } assertEquals(queueSizes[i], accum.getDeque(new TopicPartition(topic, i)).size()); } @@ -1307,7 +1307,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { // Prime built-in partitioner so that it'd switch on every record, as switching only // happens after the "sticky" limit is exceeded. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); // Issue a certain number of partition calls to validate that the partitions would be // distributed with frequencies that are reciprocal to the queue sizes. The number of @@ -1319,7 +1319,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { for (int i = 0; i < numberOfIterations; i++) { accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); ++frequencies[partition.get()]; } @@ -1336,11 +1336,11 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { // Do one append, because partition gets switched after append. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); for (int c = 10; c-- > 0; ) { accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, - callbacks, maxBlockTimeMs, time.milliseconds(), cluster); + callbacks, maxBlockTimeMs, time.milliseconds(), cluster, false); assertEquals(partition3, partition.get()); } @@ -1360,7 +1360,7 @@ public void testBuiltInPartitionerFractionalBatches() throws Exception { // Produce about 2/3 of the batch size. for (int recCount = batchSize * 2 / 3 / valSize; recCount-- > 0; ) { accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0, null, value, Record.EMPTY_HEADERS, - null, maxBlockTimeMs, time.milliseconds(), cluster); + null, maxBlockTimeMs, time.milliseconds(), cluster, false); } // Advance the time to make the batch ready. @@ -1403,7 +1403,7 @@ public void testReadyAndDrainWhenABatchIsBeingRetried() throws InterruptedExcept // Create 1 batch(batchA) to be produced to partition1. long now = time.milliseconds(); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, now, cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, now, cluster, false); // 1st attempt(not a retry) to produce batchA, it should be ready & drained to be produced. { @@ -1523,7 +1523,7 @@ private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSi CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big. for (int i = 0; i < numRecords; i++) { - accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster, false); } RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 6b2d50a52cc85..0d5573b4311b4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -428,7 +428,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { expiryCallbackCount.incrementAndGet(); try { accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, - Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster()); + Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster(), false); } catch (InterruptedException e) { throw new RuntimeException("Unexpected interruption", e); } @@ -439,7 +439,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { final long nowMs = time.milliseconds(); for (int i = 0; i < messagesPerBatch; i++) - accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, nowMs, metadataCache.cluster()); + accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, nowMs, metadataCache.cluster(), false); // Advance the clock to expire the first batch. time.sleep(10000); @@ -2423,9 +2423,9 @@ private void testSplitBatchAndSend(TransactionManager txnManager, long nowMs = time.milliseconds(); Cluster cluster = TestUtils.singletonCluster(); Future f1 = - accumulator.append(tpId.topic(), tpId.partition(), 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, nowMs, cluster).future; + accumulator.append(tpId.topic(), tpId.partition(), 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, nowMs, cluster, false).future; Future f2 = - accumulator.append(tpId.topic(), tpId.partition(), 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, nowMs, cluster).future; + accumulator.append(tpId.topic(), tpId.partition(), 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, nowMs, cluster, false).future; sender.runOnce(); // connect sender.runOnce(); // send produce request @@ -3705,7 +3705,7 @@ private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws Inter private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException { return accumulator.append(tp.topic(), tp.partition(), timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS, - null, MAX_BLOCK_TIMEOUT, time.milliseconds(), TestUtils.singletonCluster()).future; + null, MAX_BLOCK_TIMEOUT, time.milliseconds(), TestUtils.singletonCluster(), false).future; } @SuppressWarnings("deprecation") diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 494c715df79b6..00cbb786fabf0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -765,7 +765,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), thi Future responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, time.milliseconds(), - TestUtils.singletonCluster()).future; + TestUtils.singletonCluster(), false).future; sender.runOnce(); assertEquals(1, transactionManager.sequenceNumber(tp0)); @@ -796,7 +796,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), thi Future responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, time.milliseconds(), - TestUtils.singletonCluster()).future; + TestUtils.singletonCluster(), false).future; sender.runOnce(); sender.runOnce(); assertEquals(0, transactionManager.firstInFlightSequence(tp0)); @@ -3990,7 +3990,7 @@ public void testTransactionAbortableExceptionInTxnOffsetCommit() { private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException { final long nowMs = time.milliseconds(); return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, - null, MAX_BLOCK_TIMEOUT, nowMs, TestUtils.singletonCluster()).future; + null, MAX_BLOCK_TIMEOUT, nowMs, TestUtils.singletonCluster(), false).future; } private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult,