-
Notifications
You must be signed in to change notification settings - Fork 157
Description
I have a question regarding behavior of parallel-consumer with reference to batching of records.
I have a springboot app where I am using parallel-consumer with batching.
I am simulating an error scenario when 1 message in the batch fails during processing (inside user function).
I am using {concurrency: 3, batching: 3}.
As per the documentation, the whole batch would be retried if one record in the batch fails.
This is happening expectedly, but all the messages from the batch are retried always together in the same batch along with bad record.
Documentation says, "there is no guarantee that messages will be tried in same batch".
I was assuming that good messages in the batch will go through at some point in repeated attempts (being batched in different batches randomly). Only bad message will never go through and will keep failing repeatedly, but results are contrary to it.
As per the test results, all the messages are always tried in the same batch over and again in retry attempts. All of them always fail, as they are always batched together with the bad one.
In my opinion this is a bug, as it would never allow the good messages to go through even in the retries. One dirty record would always cause failure to same set of other records which were once batched with it, being repeatedly batched with it.
Quoting from documentation:
https://github.com/confluentinc/parallel-consumer#batching
""
If an exception is thrown while processing the batch, all messages in the batch will be returned to the queue, to be retried with the standard retry system. There is no guarantee that the messages will be retried again in the same batch.
""
Test Result:
- Test contains 9 messages consumed by consumer
- In the user function based on the key [ABC100002] of 1 message, and exception is thrown.
- Messages were batched as expected. All the batches were processed successfully, except for the one where ABC100002 was present.
- In that particular batch, there were total 3 messages were present. Keys: [ABC100003, ABC100002, ABC100004]
- I observed, in retries they all were always batched together, causing failure to all of the 3.
LOG:
[2025-08-15T16:36:47.414Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_fd55226b-3527-411c-a475-19cd584d6f3e] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:47.414Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_8343dfb2-37bf-4706-8eb6-f61ddf4ced0e] - Started processing batch of consumer records. Record count: 1
[2025-08-15T16:36:47.414Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d7b0a4fd-3659-41cb-a775-0a95b3d72995] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:47.414Z] INFO [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_8343dfb2-37bf-4706-8eb6-f61ddf4ced0e] - Keys: [ABC100001]
[2025-08-15T16:36:47.414Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_fd55226b-3527-411c-a475-19cd584d6f3e] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:47.414Z] INFO [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d7b0a4fd-3659-41cb-a775-0a95b3d72995] - Keys: [ABC100006, ABC100005, ABC100007]
[2025-08-15T16:36:47.418Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_8343dfb2-37bf-4706-8eb6-f61ddf4ced0e] - Transformed kafka consumer records to DB entities. Record count: 1
[2025-08-15T16:36:47.418Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d7b0a4fd-3659-41cb-a775-0a95b3d72995] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:47.418Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_fd55226b-3527-411c-a475-19cd584d6f3e] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:47.427Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_8343dfb2-37bf-4706-8eb6-f61ddf4ced0e] - Inserting batch of records in DB. Record count: 1
[2025-08-15T16:36:47.427Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d7b0a4fd-3659-41cb-a775-0a95b3d72995] - Inserting batch of records in DB. Record count: 3
[2025-08-15T16:36:47.427Z] ERROR [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_fd55226b-3527-411c-a475-19cd584d6f3e] - Error inserting Kafka events batch
java.lang.RuntimeException: Simulating failure for testing purpose
at com.abc.service.EventPersistenceService.insertKafkaEventsBatch(EventPersistenceService.java:116)
at com.abc.srvice.EventPersistenceService.persistKafkaEvents(EventPersistenceService.java:63)
at com.abc.auditor.consumer.ParallelConsumer.lambda$pollRecords$1(ParallelConsumer.java:84)
at io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun(UserFunctions.java:61)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$poll$0(ParallelEoSStreamProcessor.java:54)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$runUserFunctionInternal$15(AbstractParallelEoSStreamProcessor.java:1382)
at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunctionInternal(AbstractParallelEoSStreamProcessor.java:1382)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunction(AbstractParallelEoSStreamProcessor.java:1335)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$submitWorkToPoolInner$13(AbstractParallelEoSStreamProcessor.java:1023)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
[2025-08-15T16:36:47.428Z] ERROR [pc-pool-1-thread-2] - Exception when processing consumer record batch.
java.lang.RuntimeException: Failed to insert Kafka events batch
at com.abc.service.EventPersistenceService.insertKafkaEventsBatch(EventPersistenceService.java:144)
at com.abc.service.EventPersistenceService.persistKafkaEvents(EventPersistenceService.java:63)
at com.abc.auditor.consumer.ParallelConsumer.lambda$pollRecords$1(ParallelConsumer.java:84)
at io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun(UserFunctions.java:61)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$poll$0(ParallelEoSStreamProcessor.java:54)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$runUserFunctionInternal$15(AbstractParallelEoSStreamProcessor.java:1382)
at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunctionInternal(AbstractParallelEoSStreamProcessor.java:1382)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.runUserFunction(AbstractParallelEoSStreamProcessor.java:1335)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$submitWorkToPoolInner$13(AbstractParallelEoSStreamProcessor.java:1023)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Simulating failure for testing purpose
at com.abc.service.EventPersistenceService.insertKafkaEventsBatch(EventPersistenceService.java:116)
... 13 common frames omitted
:::
[2025-08-15T16:36:47.430Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_26e91bdd-b896-4ff6-aeb3-82d920a9b051] - Started processing batch of consumer records. Record count: 2
[2025-08-15T16:36:47.430Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_26e91bdd-b896-4ff6-aeb3-82d920a9b051] - Keys: [ABC100008, ABC100009]
[2025-08-15T16:36:47.430Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_26e91bdd-b896-4ff6-aeb3-82d920a9b051] - Transformed kafka consumer records to DB entities. Record count: 2
[2025-08-15T16:36:47.431Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_26e91bdd-b896-4ff6-aeb3-82d920a9b051] - Inserting batch of records in DB. Record count: 2
[2025-08-15T16:36:47.432Z] INFO [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_8343dfb2-37bf-4706-8eb6-f61ddf4ced0e] - Completed successful insert of records in DB. Record count: 1
[2025-08-15T16:36:47.433Z] INFO [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d7b0a4fd-3659-41cb-a775-0a95b3d72995] - Completed successful insert of records in DB. Record count: 3
[2025-08-15T16:36:47.435Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_26e91bdd-b896-4ff6-aeb3-82d920a9b051] - Completed successful insert of records in DB. Record count: 2
[2025-08-15T16:36:47.435Z] INFO [pc-pool-1-thread-1] - pl.tlinkowski.unij.service.api.collect.UnmodifiableListFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableListFactory (priority=40)
[2025-08-15T16:36:47.439Z] INFO [pc-broker-poll] - pl.tlinkowski.unij.service.api.collect.UnmodifiableMapFactory service: selected pl.tlinkowski.unij.service.collect.jdk8.Jdk8UnmodifiableMapFactory (priority=40)
[2025-08-15T16:36:48.439Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_398f6b49-dd96-4c91-9487-e9ef7dbda5d7] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:48.439Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_398f6b49-dd96-4c91-9487-e9ef7dbda5d7] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:48.440Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_398f6b49-dd96-4c91-9487-e9ef7dbda5d7] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:48.441Z] ERROR [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_398f6b49-dd96-4c91-9487-e9ef7dbda5d7] - Error inserting Kafka events batch
java.lang.RuntimeException: Simulating failure for testing purpose
...
[2025-08-15T16:36:49.443Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_341775df-3b8d-48b5-bb72-5201299e2f2d] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:49.444Z] INFO [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_341775df-3b8d-48b5-bb72-5201299e2f2d] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:49.444Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_341775df-3b8d-48b5-bb72-5201299e2f2d] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:49.445Z] ERROR [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_341775df-3b8d-48b5-bb72-5201299e2f2d] - Error inserting Kafka events batch
...
[2025-08-15T16:36:50.449Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_6a75b99d-9a71-45f3-a427-7d7f8c0b7a4f] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:50.449Z] INFO [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_6a75b99d-9a71-45f3-a427-7d7f8c0b7a4f] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:50.449Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_6a75b99d-9a71-45f3-a427-7d7f8c0b7a4f] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:50.450Z] ERROR [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_6a75b99d-9a71-45f3-a427-7d7f8c0b7a4f] - Error inserting Kafka events batch
java.lang.RuntimeException: Simulating failure for testing purpose
...
[2025-08-15T16:36:51.453Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_44d64ccd-9ef5-42a4-a95d-d78ce457bad6] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:51.453Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_44d64ccd-9ef5-42a4-a95d-d78ce457bad6] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:51.453Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_44d64ccd-9ef5-42a4-a95d-d78ce457bad6] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:51.454Z] ERROR [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_44d64ccd-9ef5-42a4-a95d-d78ce457bad6] - Error inserting Kafka events batch
java.lang.RuntimeException: Simulating failure for testing purpose
...
[2025-08-15T16:36:52.457Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_3a012a90-4eae-4d98-8df6-c4a4fec29c3f] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:52.457Z] INFO [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_3a012a90-4eae-4d98-8df6-c4a4fec29c3f] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:52.457Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_3a012a90-4eae-4d98-8df6-c4a4fec29c3f] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:52.459Z] ERROR [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_3a012a90-4eae-4d98-8df6-c4a4fec29c3f] - Error inserting Kafka events batch
java.lang.RuntimeException: Simulating failure for testing purpose
...
[2025-08-15T16:36:53.461Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_b00b6a20-7451-4a0b-a981-3c2307fe9a9c] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:53.462Z] INFO [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_b00b6a20-7451-4a0b-a981-3c2307fe9a9c] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:53.462Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_b00b6a20-7451-4a0b-a981-3c2307fe9a9c] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:53.463Z] ERROR [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_b00b6a20-7451-4a0b-a981-3c2307fe9a9c] - Error inserting Kafka events batch
java.lang.RuntimeException: Simulating failure for testing purpose
...
[2025-08-15T16:36:54.466Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_36a7cd50-39f0-4a89-9c7e-23bc7ae91e0c] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:54.466Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_36a7cd50-39f0-4a89-9c7e-23bc7ae91e0c] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:54.466Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_36a7cd50-39f0-4a89-9c7e-23bc7ae91e0c] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:54.468Z] ERROR [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_36a7cd50-39f0-4a89-9c7e-23bc7ae91e0c] - Error inserting Kafka events batch
...
[2025-08-15T16:36:55.471Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_0c8191ea-a0c7-4c6d-b7d5-fb7fa20b8600] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:55.471Z] INFO [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_0c8191ea-a0c7-4c6d-b7d5-fb7fa20b8600] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:55.471Z] DEBUG [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_0c8191ea-a0c7-4c6d-b7d5-fb7fa20b8600] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:55.472Z] ERROR [pc-pool-1-thread-1] [trace-id: pc-pool-1-thread-1_0c8191ea-a0c7-4c6d-b7d5-fb7fa20b8600] - Error inserting Kafka events batch
...
[2025-08-15T16:36:56.474Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d61e5d7a-3d99-49cb-8bcb-d2c6e6b75885] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:56.474Z] INFO [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d61e5d7a-3d99-49cb-8bcb-d2c6e6b75885] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:56.475Z] DEBUG [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d61e5d7a-3d99-49cb-8bcb-d2c6e6b75885] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:56.476Z] ERROR [pc-pool-1-thread-3] [trace-id: pc-pool-1-thread-3_d61e5d7a-3d99-49cb-8bcb-d2c6e6b75885] - Error inserting Kafka events batch
...
[2025-08-15T16:36:57.478Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_252ec2d9-3956-4374-b5f6-d9cc3f8fc412] - Started processing batch of consumer records. Record count: 3
[2025-08-15T16:36:57.478Z] INFO [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_252ec2d9-3956-4374-b5f6-d9cc3f8fc412] - Keys: [ABC100003, ABC100002, ABC100004]
[2025-08-15T16:36:57.478Z] DEBUG [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_252ec2d9-3956-4374-b5f6-d9cc3f8fc412] - Transformed kafka consumer records to DB entities. Record count: 3
[2025-08-15T16:36:57.479Z] ERROR [pc-pool-1-thread-2] [trace-id: pc-pool-1-thread-2_252ec2d9-3956-4374-b5f6-d9cc3f8fc412] - Error inserting Kafka events batch
......
......