diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 4c8b4e1ca2..0a1c34753c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -137,7 +137,7 @@ private void run() { } LOGGER.info("Creating consumers..."); - int consumers = consumerService.createConsumers(topics, config.consumersConfig()); + int consumers = consumerService.createConsumers(topics, config.consumersConfig(), stats); consumerService.start(this::messageReceived, config.maxConsumeRecordRate); LOGGER.info("Created {} consumers, took {} ms", consumers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); @@ -223,11 +223,10 @@ private void messageSent(int size, long sendTimeNanos, Exception exception) { } } - private void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) { + private void messageReceived(TopicPartition topicPartition) { if (preparing && config.awaitTopicReady && (config.catchupTopicPrefix == null || config.catchupTopicPrefix.isEmpty())) { readyPartitions.add(topicPartition); } - stats.messageReceived(payload.length, sendTimeNanos); } private void waitTopicsReady(boolean hasConsumer) { diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java index 0d182239dc..c4b2c437bf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.ThreadUtils; @@ -92,10 +93,10 @@ public ConsumerService(String bootstrapServer, Properties properties) { * @param config consumer configuration * @return the number of consumers created */ - public int createConsumers(List topics, ConsumersConfig config) { + public int createConsumers(List topics, ConsumersConfig config, Stats stats) { int count = 0; for (int g = 0; g < config.groupsPerTopic; g++) { - Group group = new Group(g, config.consumersPerGroup, topics, config); + Group group = new Group(g, config.consumersPerGroup, topics, config, stats); groups.add(group); count += group.consumerCount(); } @@ -104,13 +105,9 @@ public int createConsumers(List topics, ConsumersConfig config) { public void start(ConsumerCallback callback, int pollRate) { BlockingBucket bucket = rateLimitBucket(pollRate); - ConsumerCallback callbackWithRateLimit = (tp, p, st) -> { - callback.messageReceived(tp, p, st); - bucket.consume(1); - }; CompletableFuture.allOf( groups.stream() - .map(group -> group.start(callbackWithRateLimit)) + .map(group -> group.start(callback, bucket)) .toArray(CompletableFuture[]::new) ).join(); } @@ -118,26 +115,26 @@ public void start(ConsumerCallback callback, int pollRate) { public void pause() { groups.forEach(Group::pause); } - + /** * Resume all consumer groups */ public void resume() { groups.forEach(Group::resume); } - + /** * Resume only a percentage of consumer groups - * + * * @param percentage The percentage of consumers to resume (0-100) */ public void resume(int percentage) { int size = groups.size(); int consumersToResume = (int) Math.ceil(size * (percentage / 100.0)); consumersToResume = Math.max(1, Math.min(size, consumersToResume)); // Ensure at least 1 and at most size - + LOGGER.info("Resuming {}% of consumers ({} out of {})", percentage, consumersToResume, size); - + for (int i = 0; i < consumersToResume; i++) { groups.get(i).resume(); } @@ -177,7 +174,7 @@ public void resumeTopics(Collection topics) { /** * Reset consumer offsets for catch-up reading. - * + * * @param startMillis The timestamp to start seeking from * @param intervalMillis The interval between group starts * @param percentage The percentage of consumers to activate (0-100) @@ -187,21 +184,21 @@ public void resetOffset(long startMillis, long intervalMillis, int percentage) { int size = groups.size(); int consumersToActivate = (int) Math.ceil(size * (percentage / 100.0)); consumersToActivate = Math.max(1, Math.min(size, consumersToActivate)); // Ensure at least 1 and at most size - + LOGGER.info("Activating {}% of consumers ({} out of {})", percentage, consumersToActivate, size); - + for (int i = 0; i < consumersToActivate; i++) { Group group = groups.get(i); group.seek(timestamp.getAndAdd(intervalMillis)); LOGGER.info("Reset consumer group offsets: {}/{}", i + 1, consumersToActivate); } - + // Keep the remaining consumers paused if (consumersToActivate < size) { LOGGER.info("Keeping {} consumer groups paused during catch-up", size - consumersToActivate); } } - + /** * Reset all consumer offsets (100% consumers) * @param startMillis The timestamp to start seeking from @@ -238,10 +235,8 @@ public interface ConsumerCallback { * Called when a message is received. * * @param topicPartition the topic partition of the received message - * @param payload the received message payload - * @param sendTimeNanos the time in nanoseconds when the message was sent */ - void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) throws InterruptedException; + void messageReceived(TopicPartition topicPartition) throws InterruptedException; } public static class ConsumersConfig { @@ -263,22 +258,22 @@ private class Group implements AutoCloseable { private final int index; private final Map> consumers = new HashMap<>(); - public Group(int index, int consumersPerGroup, List topics, ConsumersConfig config) { + public Group(int index, int consumersPerGroup, List topics, ConsumersConfig config, Stats stats) { this.index = index; Properties common = toProperties(config); for (Topic topic : topics) { List topicConsumers = new ArrayList<>(); for (int c = 0; c < consumersPerGroup; c++) { - Consumer consumer = newConsumer(topic, common); + Consumer consumer = newConsumer(topic, common, stats); topicConsumers.add(consumer); } consumers.put(topic, topicConsumers); } } - public CompletableFuture start(ConsumerCallback callback) { - consumers().forEach(consumer -> consumer.start(callback)); + public CompletableFuture start(ConsumerCallback callback, BlockingBucket bucket) { + consumers().forEach(consumer -> consumer.start(callback, bucket)); // wait for all consumers to join the group return CompletableFuture.allOf(consumers() @@ -336,11 +331,11 @@ private Properties toProperties(ConsumersConfig config) { return properties; } - private Consumer newConsumer(Topic topic, Properties common) { + private Consumer newConsumer(Topic topic, Properties common, Stats stats) { Properties properties = new Properties(); properties.putAll(common); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId(topic)); - return new Consumer(properties, topic.name); + return new Consumer(properties, topic.name, stats); } private Stream consumers() { @@ -369,16 +364,17 @@ private static class Consumer { private final CompletableFuture started = new CompletableFuture<>(); private boolean paused = false; private volatile boolean closing = false; + private final Stats stats; - public Consumer(Properties properties, String topic) { + public Consumer(Properties properties, String topic, Stats stats) { this.consumer = new KafkaConsumer<>(properties); this.executor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("perf-consumer", false)); - + this.stats = stats; consumer.subscribe(List.of(topic), subscribeListener()); } - public void start(ConsumerCallback callback) { - this.task = this.executor.submit(() -> pollRecords(consumer, callback)); + public void start(ConsumerCallback callback, BlockingBucket bucket) { + this.task = this.executor.submit(() -> pollRecords(consumer, callback, bucket)); } public CompletableFuture started() { @@ -408,18 +404,28 @@ public void onPartitionsAssigned(Collection partitions) { }; } - private void pollRecords(KafkaConsumer consumer, ConsumerCallback callback) { + private void pollRecords(KafkaConsumer consumer, ConsumerCallback callback, BlockingBucket bucket) { while (!closing) { try { while (paused) { Thread.sleep(PAUSE_INTERVAL); } ConsumerRecords records = consumer.poll(POLL_TIMEOUT); + int numMessages = records.count(); + if (numMessages == 0) { + continue; + } + ConsumerRecord firstRecord = records.iterator().next(); + Header header = firstRecord.headers().lastHeader(HEADER_KEY_SEND_TIME_NANOS); + long bytes = 0; + long sendTimeNanos = Longs.fromByteArray(header.value()); for (ConsumerRecord record : records) { - long sendTimeNanos = Longs.fromByteArray(record.headers().lastHeader(HEADER_KEY_SEND_TIME_NANOS).value()); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); - callback.messageReceived(topicPartition, record.value(), sendTimeNanos); + bytes += record.value().length; + callback.messageReceived(topicPartition); } + stats.messageReceived(numMessages, bytes, sendTimeNanos); + bucket.consume(records.count()); } catch (InterruptException | InterruptedException e) { // ignore, as we are closing } catch (Exception e) { diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/Stats.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/Stats.java index c2bec26a13..f0058504c0 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/Stats.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/Stats.java @@ -70,12 +70,12 @@ public void messageFailed() { totalMessagesSendFailed.increment(); } - public void messageReceived(long bytes, long sendTimeNanos) { + public void messageReceived(long numMessages, long bytes, long sendTimeNanos) { long latencyMicros = TimeUnit.NANOSECONDS.toMicros(StatsCollector.currentNanos() - sendTimeNanos); - messagesReceived.increment(); + messagesReceived.add(numMessages); bytesReceived.add(bytes); endToEndLatencyMicros.recordValue(latencyMicros); - totalMessagesReceived.increment(); + totalMessagesReceived.add(numMessages); totalBytesReceived.add(bytes); totalEndToEndLatencyMicros.recordValue(latencyMicros); maxSendTimeNanos.updateAndGet(current -> Math.max(current, sendTimeNanos));