Skip to content

Commit 993f3bf

Browse files
committed
invoke consume in pollRecords
1 parent 0cbf052 commit 993f3bf

File tree

1 file changed

+7
-15
lines changed

1 file changed

+7
-15
lines changed

tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import java.util.concurrent.Future;
6060
import java.util.concurrent.TimeUnit;
6161
import java.util.concurrent.atomic.AtomicLong;
62-
import java.util.concurrent.atomic.LongAdder;
6362
import java.util.stream.Collectors;
6463
import java.util.stream.Stream;
6564

@@ -105,17 +104,9 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {
105104

106105
public void start(ConsumerCallback callback, int pollRate) {
107106
BlockingBucket bucket = rateLimitBucket(pollRate);
108-
LongAdder counter = new LongAdder();
109-
ConsumerCallback callbackWithRateLimit = (tp, p, st) -> {
110-
callback.messageReceived(tp, p, st);
111-
counter.increment();
112-
if ((counter.sum()) % 1000 == 0) {
113-
bucket.consume(1);
114-
}
115-
};
116107
CompletableFuture.allOf(
117108
groups.stream()
118-
.map(group -> group.start(callbackWithRateLimit))
109+
.map(group -> group.start(callback, bucket))
119110
.toArray(CompletableFuture[]::new)
120111
).join();
121112
}
@@ -282,8 +273,8 @@ public Group(int index, int consumersPerGroup, List<Topic> topics, ConsumersConf
282273
}
283274
}
284275

285-
public CompletableFuture<Void> start(ConsumerCallback callback) {
286-
consumers().forEach(consumer -> consumer.start(callback));
276+
public CompletableFuture<Void> start(ConsumerCallback callback, BlockingBucket bucket) {
277+
consumers().forEach(consumer -> consumer.start(callback, bucket));
287278

288279
// wait for all consumers to join the group
289280
return CompletableFuture.allOf(consumers()
@@ -382,8 +373,8 @@ public Consumer(Properties properties, String topic) {
382373
consumer.subscribe(List.of(topic), subscribeListener());
383374
}
384375

385-
public void start(ConsumerCallback callback) {
386-
this.task = this.executor.submit(() -> pollRecords(consumer, callback));
376+
public void start(ConsumerCallback callback, BlockingBucket bucket) {
377+
this.task = this.executor.submit(() -> pollRecords(consumer, callback, bucket));
387378
}
388379

389380
public CompletableFuture<Void> started() {
@@ -413,7 +404,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
413404
};
414405
}
415406

416-
private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallback callback) {
407+
private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallback callback, BlockingBucket bucket) {
417408
while (!closing) {
418409
try {
419410
while (paused) {
@@ -425,6 +416,7 @@ private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallbac
425416
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
426417
callback.messageReceived(topicPartition, record.value(), sendTimeNanos);
427418
}
419+
bucket.consume(records.count());
428420
} catch (InterruptException | InterruptedException e) {
429421
// ignore, as we are closing
430422
} catch (Exception e) {

0 commit comments

Comments
 (0)