File tree Expand file tree Collapse file tree 2 files changed +9
-1
lines changed Expand file tree Collapse file tree 2 files changed +9
-1
lines changed Original file line number Diff line number Diff line change @@ -102,6 +102,10 @@ class UnsupportedCodecError(KafkaError):
102102 pass
103103
104104
105+ class TransactionAbortedError (KafkaError ):
106+ pass
107+
108+
105109class BrokerResponseError (KafkaError ):
106110 errno = None
107111 message = None
Original file line number Diff line number Diff line change @@ -166,7 +166,11 @@ def run_once(self):
166166 self ._client .poll (timeout_ms = self .config ['retry_backoff_ms' ])
167167 return
168168 elif self ._transaction_manager .has_abortable_error ():
169- self ._accumulator .abort_undrained_batches (self ._transaction_manager .last_error )
169+ # Attempt to get the last error that caused this abort.
170+ # If there was no error, but we are still aborting,
171+ # then this is most likely a case where there was no fatal error.
172+ exception = self ._transaction_manager .last_error or Errors .TransactionAbortedError ()
173+ self ._accumulator .abort_undrained_batches (exception )
170174
171175 except Errors .SaslAuthenticationFailedError as e :
172176 # This is already logged as error, but propagated here to perform any clean ups.
You can’t perform that action at this time.
0 commit comments