Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void run() {
}

LOGGER.info("Creating consumers...");
int consumers = consumerService.createConsumers(topics, config.consumersConfig());
int consumers = consumerService.createConsumers(topics, config.consumersConfig(), stats);
consumerService.start(this::messageReceived, config.maxConsumeRecordRate);
LOGGER.info("Created {} consumers, took {} ms", consumers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

Expand Down Expand Up @@ -223,11 +223,10 @@ private void messageSent(int size, long sendTimeNanos, Exception exception) {
}
}

private void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) {
private void messageReceived(TopicPartition topicPartition) {
if (preparing && config.awaitTopicReady && (config.catchupTopicPrefix == null || config.catchupTopicPrefix.isEmpty())) {
readyPartitions.add(topicPartition);
}
stats.messageReceived(payload.length, sendTimeNanos);
}

private void waitTopicsReady(boolean hasConsumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ThreadUtils;
Expand Down Expand Up @@ -92,10 +93,10 @@ public ConsumerService(String bootstrapServer, Properties properties) {
* @param config consumer configuration
* @return the number of consumers created
*/
public int createConsumers(List<Topic> topics, ConsumersConfig config) {
public int createConsumers(List<Topic> topics, ConsumersConfig config, Stats stats) {
int count = 0;
for (int g = 0; g < config.groupsPerTopic; g++) {
Group group = new Group(g, config.consumersPerGroup, topics, config);
Group group = new Group(g, config.consumersPerGroup, topics, config, stats);
groups.add(group);
count += group.consumerCount();
}
Expand All @@ -104,40 +105,36 @@ public int createConsumers(List<Topic> topics, ConsumersConfig config) {

public void start(ConsumerCallback callback, int pollRate) {
BlockingBucket bucket = rateLimitBucket(pollRate);
ConsumerCallback callbackWithRateLimit = (tp, p, st) -> {
callback.messageReceived(tp, p, st);
bucket.consume(1);
};
CompletableFuture.allOf(
groups.stream()
.map(group -> group.start(callbackWithRateLimit))
.map(group -> group.start(callback, bucket))
.toArray(CompletableFuture[]::new)
).join();
}

public void pause() {
groups.forEach(Group::pause);
}

/**
* Resume all consumer groups
*/
public void resume() {
groups.forEach(Group::resume);
}

/**
* Resume only a percentage of consumer groups
*
*
* @param percentage The percentage of consumers to resume (0-100)
*/
public void resume(int percentage) {
int size = groups.size();
int consumersToResume = (int) Math.ceil(size * (percentage / 100.0));
consumersToResume = Math.max(1, Math.min(size, consumersToResume)); // Ensure at least 1 and at most size

LOGGER.info("Resuming {}% of consumers ({} out of {})", percentage, consumersToResume, size);

for (int i = 0; i < consumersToResume; i++) {
groups.get(i).resume();
}
Expand Down Expand Up @@ -177,7 +174,7 @@ public void resumeTopics(Collection<String> topics) {

/**
* Reset consumer offsets for catch-up reading.
*
*
* @param startMillis The timestamp to start seeking from
* @param intervalMillis The interval between group starts
* @param percentage The percentage of consumers to activate (0-100)
Expand All @@ -187,21 +184,21 @@ public void resetOffset(long startMillis, long intervalMillis, int percentage) {
int size = groups.size();
int consumersToActivate = (int) Math.ceil(size * (percentage / 100.0));
consumersToActivate = Math.max(1, Math.min(size, consumersToActivate)); // Ensure at least 1 and at most size

LOGGER.info("Activating {}% of consumers ({} out of {})", percentage, consumersToActivate, size);

for (int i = 0; i < consumersToActivate; i++) {
Group group = groups.get(i);
group.seek(timestamp.getAndAdd(intervalMillis));
LOGGER.info("Reset consumer group offsets: {}/{}", i + 1, consumersToActivate);
}

// Keep the remaining consumers paused
if (consumersToActivate < size) {
LOGGER.info("Keeping {} consumer groups paused during catch-up", size - consumersToActivate);
}
}

/**
* Reset all consumer offsets (100% consumers)
* @param startMillis The timestamp to start seeking from
Expand Down Expand Up @@ -238,10 +235,8 @@ public interface ConsumerCallback {
* Called when a message is received.
*
* @param topicPartition the topic partition of the received message
* @param payload the received message payload
* @param sendTimeNanos the time in nanoseconds when the message was sent
*/
void messageReceived(TopicPartition topicPartition, byte[] payload, long sendTimeNanos) throws InterruptedException;
void messageReceived(TopicPartition topicPartition) throws InterruptedException;
}

public static class ConsumersConfig {
Expand All @@ -263,22 +258,22 @@ private class Group implements AutoCloseable {
private final int index;
private final Map<Topic, List<Consumer>> consumers = new HashMap<>();

public Group(int index, int consumersPerGroup, List<Topic> topics, ConsumersConfig config) {
public Group(int index, int consumersPerGroup, List<Topic> topics, ConsumersConfig config, Stats stats) {
this.index = index;

Properties common = toProperties(config);
for (Topic topic : topics) {
List<Consumer> topicConsumers = new ArrayList<>();
for (int c = 0; c < consumersPerGroup; c++) {
Consumer consumer = newConsumer(topic, common);
Consumer consumer = newConsumer(topic, common, stats);
topicConsumers.add(consumer);
}
consumers.put(topic, topicConsumers);
}
}

public CompletableFuture<Void> start(ConsumerCallback callback) {
consumers().forEach(consumer -> consumer.start(callback));
public CompletableFuture<Void> start(ConsumerCallback callback, BlockingBucket bucket) {
consumers().forEach(consumer -> consumer.start(callback, bucket));

// wait for all consumers to join the group
return CompletableFuture.allOf(consumers()
Expand Down Expand Up @@ -336,11 +331,11 @@ private Properties toProperties(ConsumersConfig config) {
return properties;
}

private Consumer newConsumer(Topic topic, Properties common) {
private Consumer newConsumer(Topic topic, Properties common, Stats stats) {
Properties properties = new Properties();
properties.putAll(common);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId(topic));
return new Consumer(properties, topic.name);
return new Consumer(properties, topic.name, stats);
}

private Stream<Consumer> consumers() {
Expand Down Expand Up @@ -369,16 +364,17 @@ private static class Consumer {
private final CompletableFuture<Void> started = new CompletableFuture<>();
private boolean paused = false;
private volatile boolean closing = false;
private final Stats stats;

public Consumer(Properties properties, String topic) {
public Consumer(Properties properties, String topic, Stats stats) {
this.consumer = new KafkaConsumer<>(properties);
this.executor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("perf-consumer", false));

this.stats = stats;
consumer.subscribe(List.of(topic), subscribeListener());
}

public void start(ConsumerCallback callback) {
this.task = this.executor.submit(() -> pollRecords(consumer, callback));
public void start(ConsumerCallback callback, BlockingBucket bucket) {
this.task = this.executor.submit(() -> pollRecords(consumer, callback, bucket));
}

public CompletableFuture<Void> started() {
Expand Down Expand Up @@ -408,18 +404,28 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
};
}

private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallback callback) {
private void pollRecords(KafkaConsumer<String, byte[]> consumer, ConsumerCallback callback, BlockingBucket bucket) {
while (!closing) {
try {
while (paused) {
Thread.sleep(PAUSE_INTERVAL);
}
ConsumerRecords<String, byte[]> records = consumer.poll(POLL_TIMEOUT);
int numMessages = records.count();
if (numMessages == 0) {
continue;
}
ConsumerRecord<String, byte[]> firstRecord = records.iterator().next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first record sendTimeNanos could be set in the for loop

Header header = firstRecord.headers().lastHeader(HEADER_KEY_SEND_TIME_NANOS);
long bytes = 0;
long sendTimeNanos = Longs.fromByteArray(header.value());
for (ConsumerRecord<String, byte[]> record : records) {
long sendTimeNanos = Longs.fromByteArray(record.headers().lastHeader(HEADER_KEY_SEND_TIME_NANOS).value());
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
callback.messageReceived(topicPartition, record.value(), sendTimeNanos);
bytes += record.value().length;
callback.messageReceived(topicPartition);
}
stats.messageReceived(numMessages, bytes, sendTimeNanos);
bucket.consume(records.count());
} catch (InterruptException | InterruptedException e) {
// ignore, as we are closing
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public void messageFailed() {
totalMessagesSendFailed.increment();
}

public void messageReceived(long bytes, long sendTimeNanos) {
public void messageReceived(long numMessages, long bytes, long sendTimeNanos) {
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(StatsCollector.currentNanos() - sendTimeNanos);
messagesReceived.increment();
messagesReceived.add(numMessages);
bytesReceived.add(bytes);
endToEndLatencyMicros.recordValue(latencyMicros);
totalMessagesReceived.increment();
totalMessagesReceived.add(numMessages);
totalBytesReceived.add(bytes);
totalEndToEndLatencyMicros.recordValue(latencyMicros);
maxSendTimeNanos.updateAndGet(current -> Math.max(current, sendTimeNanos));
Expand Down
Loading