From 707ac1f7f8138ad6d6230fa271f9a746d0ddeb07 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Jul 2025 12:03:28 -0700 Subject: [PATCH] KafkaProducer: Handle UnknownProducerIdError --- kafka/producer/sender.py | 7 +++++++ kafka/producer/transaction_manager.py | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 09b9a0f10..869689e9b 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -473,6 +473,13 @@ def _fail_batch(self, batch, partition_response): # about the previously committed message. Note that this will discard the producer id and sequence # numbers for all existing partitions. self._transaction_manager.reset_producer_id() + elif isinstance(exception, Errors.UnknownProducerIdError): + # If we get an UnknownProducerId for a partition, then the broker has no state for that producer. It will + # therefore accept a write with sequence number 0. We reset the sequence number for the partition here so + # that the producer can continue after aborting the transaction. All inflight-requests to this partition + # will also fail with an UnknownProducerId error, so the sequence will remain at 0. Note that if the + # broker supports bumping the epoch, we will later reset all sequence numbers after calling InitProducerId + self._transaction_manager.reset_sequence_for_partition(batch.topic_partition) elif isinstance(exception, (Errors.ClusterAuthorizationFailedError, Errors.TransactionalIdAuthorizationFailedError, Errors.ProducerFencedError, diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 5d69ddc97..a44d7d9b3 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -330,6 +330,10 @@ def increment_sequence_number(self, tp, increment): else: self._sequence_numbers[tp] += increment + def reset_sequence_for_partition(self, tp): + with self._lock: + self._sequence_numbers.pop(tp, None) + def next_request_handler(self, has_incomplete_batches): with self._lock: if self._new_partitions_in_transaction: