Skip to content

Commit 5415336

Browse files
committed
only do client-side throttling if api_version is set
1 parent 0a5c41a commit 5415336

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

kafka/conn.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,8 @@ def send(self, request, blocking=True, request_timeout_ms=None):
998998
elif not self.connected():
999999
return future.failure(Errors.KafkaConnectionError(str(self)))
10001000
elif not self.can_send_more():
1001-
if self._throttle_time:
1001+
# very small race here, but prefer it over breaking abstraction to check self._throttle_time
1002+
if self.throttled():
10021003
return future.failure(Errors.ThrottlingQuotaExceededError(str(self)))
10031004
return future.failure(Errors.TooManyInFlightRequests(str(self)))
10041005
return self._send(request, blocking=blocking, request_timeout_ms=request_timeout_ms)
@@ -1097,7 +1098,7 @@ def _maybe_throttle(self, response):
10971098
return
10981099
# Client side throttling enabled in v2.0 brokers
10991100
# prior to that throttling (if present) was managed broker-side
1100-
if not self.config['api_version'] or self.config['api_version'] >= (2, 0):
1101+
if self.config['api_version'] is not None and self.config['api_version'] >= (2, 0):
11011102
throttle_time = time.time() + throttle_time_ms / 1000
11021103
self._throttle_time = max(throttle_time, self._throttle_time or 0)
11031104
log.warning("%s: %s throttled by broker (%d ms)", self,

0 commit comments

Comments
 (0)