Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,13 @@ def _fail_batch(self, batch, partition_response):
# about the previously committed message. Note that this will discard the producer id and sequence
# numbers for all existing partitions.
self._transaction_manager.reset_producer_id()
elif isinstance(exception, Errors.UnknownProducerIdError):
# If we get an UnknownProducerId for a partition, then the broker has no state for that producer. It will
# therefore accept a write with sequence number 0. We reset the sequence number for the partition here so
# that the producer can continue after aborting the transaction. All inflight-requests to this partition
# will also fail with an UnknownProducerId error, so the sequence will remain at 0. Note that if the
# broker supports bumping the epoch, we will later reset all sequence numbers after calling InitProducerId
self._transaction_manager.reset_sequence_for_partition(batch.topic_partition)
elif isinstance(exception, (Errors.ClusterAuthorizationFailedError,
Errors.TransactionalIdAuthorizationFailedError,
Errors.ProducerFencedError,
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ def increment_sequence_number(self, tp, increment):
else:
self._sequence_numbers[tp] += increment

def reset_sequence_for_partition(self, tp):
with self._lock:
self._sequence_numbers.pop(tp, None)

def next_request_handler(self, has_incomplete_batches):
with self._lock:
if self._new_partitions_in_transaction:
Expand Down
Loading