Skip to content

Parallel Consumer is 30 times slower than Normal Consumer #884

@gmreads

Description

@gmreads

What am I doing wrong ?
Total Messages: 4.5 Million, with lz4 compression. 10GB total size with avg payload 18Kb.

Brokers: 1, Topic: 1, Partitions: 1, Replication: 1

Lag Reduction Speed

Raw Consumer -> 30-40k/sec
Parallel Consumer -> 1000/sec

Script used to find lag reduction speed
while true; do now=$(date); lag=$(/bin/kafka-consumer-groups --bootstrap-server localhost:29092 --describe --group pcg3 | grep -w "^[^ ]\+[ \t]\+test\.topic\.1[ \t]\+" | awk '{sum+=$6} END {print sum}'); if [ "$prev" -ne 0 ]; then rate=$(( (prev - lag) / 5 )); echo "$now PreviousLag:$prev ,CurrentLag: $lag, Rate: $rate msg/sec"; fi; prev=$lag; sleep 5; done

Minimal Example

package main;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static pl.tlinkowski.unij.api.UniLists.of;

public class KafkaConsumerExample {

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    static {
        System.setProperty("org.slf4j.simpleLogger.log.org.apache.kafka", "INFO");

    }

    public static void main(String[] args) {

        // consumeKafkaRecords("single1", "test.topic.1");
        startParallelRecordConsumption("pcg3", "test.topic.1");
    }

    private static void startParallelRecordConsumption(String groupId, String topic) {
        ParallelStreamProcessor<String, String> parallelConsumer = setupParallelConsumer(groupId, topic);
        parallelConsumer.poll(records -> {
            // Process the record
            try {
                JSONObject json = JSON.parseObject(records.getSingleConsumerRecord().value());
            } catch (Exception e) {
                e.printStackTrace();
            } 
        });
    }

    private static KafkaConsumer<String, String> getKafkaConsumer(String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:29092");
        props.put("group.id", groupId);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("auto.offset.reset", "earliest"); // Read from beginning of topic
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return new KafkaConsumer<>(props);
    }

    private static ParallelStreamProcessor<String, String> setupParallelConsumer(String groupId, String topic) {
        // tag::exampleSetup[]
        Consumer<String, String> kafkaConsumer = getKafkaConsumer(groupId); // <1>

        ParallelConsumerOptions<String, String> options = ParallelConsumerOptions.<String, String>builder()
                .ordering(KEY) // <2>
                .maxConcurrency(1000) // <3>
                .consumer(kafkaConsumer)
                .producer(null)
                .commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS) // <3>
                .build();

        ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor
                .createEosStreamProcessor(options);

        eosStreamProcessor.subscribe(of(topic)); // <4>

        return eosStreamProcessor;
        // end::exampleSetup[]
    }

    private static void consumeKafkaRecords(String groupId, String topic) {
        KafkaConsumer<String, String> consumer = getKafkaConsumer(groupId);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);


                for (ConsumerRecord<String, String> record : records) {
                    try {
                        JSONObject json = JSON.parseObject(record.value());
                    } catch (Exception e) {
                        System.err.println("Error processing record: " + e.getMessage());
                        e.printStackTrace();
                    }
                }

                try {
                    consumer.commitSync();
                } catch (Exception e) {
                    System.err.println("Error while committing offset: " + e.getMessage());
                    throw e;
                }

            }
        } catch (OutOfMemoryError oom) {
            System.err.println("Caught OutOfMemoryError: " + oom.getMessage());
            oom.printStackTrace();
            // Optionally: cleanup, logging, or graceful shutdown
        } catch (Exception e) {
            System.err.println("Caught OutOfMemoryError: exceptionnnn ");
            e.printStackTrace();
        } finally {
            System.err.println("Caught OutOfMemoryError: finallllyyyy");
            consumer.close();
        }
    }

}

Kafka consumer config

[main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:29092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-pcg3-1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = pcg3
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0

Originally posted by @gmreads in #883

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions