@@ -236,6 +236,7 @@ def __init__(self, host, port, afi, **configs):
236236 self ._sock_afi = afi
237237 self ._sock_addr = None
238238 self ._api_versions = None
239+ self ._throttle_time = None
239240
240241 self .config = copy .copy (self .DEFAULT_CONFIG )
241242 for key in self .config :
@@ -851,6 +852,27 @@ def blacked_out(self):
851852 return self .connection_delay () > 0
852853 return False
853854
855+ def throttled (self ):
856+ """
857+ Return True if we are connected but currently throttled.
858+ """
859+ if self .state is not ConnectionStates .CONNECTED :
860+ return False
861+ return self .throttle_delay () > 0
862+
863+ def throttle_delay (self ):
864+ """
865+ Return the number of milliseconds to wait until connection is no longer throttled.
866+ """
867+ if self ._throttle_time is not None :
868+ remaining_ms = (self ._throttle_time - time .time ()) * 1000
869+ if remaining_ms > 0 :
870+ return remaining_ms
871+ else :
872+ self ._throttle_time = None
873+ return 0
874+ return 0
875+
854876 def connection_delay (self ):
855877 """
856878 Return the number of milliseconds to wait, based on the connection
@@ -976,6 +998,9 @@ def send(self, request, blocking=True, request_timeout_ms=None):
976998 elif not self .connected ():
977999 return future .failure (Errors .KafkaConnectionError (str (self )))
9781000 elif not self .can_send_more ():
1001+ # very small race here, but prefer it over breaking abstraction to check self._throttle_time
1002+ if self .throttled ():
1003+ return future .failure (Errors .ThrottlingQuotaExceededError (str (self )))
9791004 return future .failure (Errors .TooManyInFlightRequests (str (self )))
9801005 return self ._send (request , blocking = blocking , request_timeout_ms = request_timeout_ms )
9811006
@@ -1063,8 +1088,26 @@ def send_pending_requests_v2(self):
10631088 self .close (error = error )
10641089 return False
10651090
1091+ def _maybe_throttle (self , response ):
1092+ throttle_time_ms = getattr (response , 'throttle_time_ms' , 0 )
1093+ if self ._sensors :
1094+ self ._sensors .throttle_time .record (throttle_time_ms )
1095+ if not throttle_time_ms :
1096+ if self ._throttle_time is not None :
1097+ self ._throttle_time = None
1098+ return
1099+ # Client side throttling enabled in v2.0 brokers
1100+ # prior to that throttling (if present) was managed broker-side
1101+ if self .config ['api_version' ] is not None and self .config ['api_version' ] >= (2 , 0 ):
1102+ throttle_time = time .time () + throttle_time_ms / 1000
1103+ self ._throttle_time = max (throttle_time , self ._throttle_time or 0 )
1104+ log .warning ("%s: %s throttled by broker (%d ms)" , self ,
1105+ response .__class__ .__name__ , throttle_time_ms )
1106+
10661107 def can_send_more (self ):
1067- """Return True unless there are max_in_flight_requests_per_connection."""
1108+ """Check for throttling / quota violations and max in-flight-requests"""
1109+ if self .throttle_delay () > 0 :
1110+ return False
10681111 max_ifrs = self .config ['max_in_flight_requests_per_connection' ]
10691112 return len (self .in_flight_requests ) < max_ifrs
10701113
@@ -1097,6 +1140,7 @@ def recv(self):
10971140 self ._sensors .request_time .record (latency_ms )
10981141
10991142 log .debug ('%s Response %d (%s ms): %s' , self , correlation_id , latency_ms , response )
1143+ self ._maybe_throttle (response )
11001144 responses [i ] = (response , future )
11011145
11021146 return responses
@@ -1399,6 +1443,16 @@ def __init__(self, metrics, metric_group_prefix, node_id):
13991443 'The maximum request latency in ms.' ),
14001444 Max ())
14011445
1446+ throttle_time = metrics .sensor ('throttle-time' )
1447+ throttle_time .add (metrics .metric_name (
1448+ 'throttle-time-avg' , metric_group_name ,
1449+ 'The average throttle time in ms.' ),
1450+ Avg ())
1451+ throttle_time .add (metrics .metric_name (
1452+ 'throttle-time-max' , metric_group_name ,
1453+ 'The maximum throttle time in ms.' ),
1454+ Max ())
1455+
14021456 # if one sensor of the metrics has been registered for the connection,
14031457 # then all other sensors should have been registered; and vice versa
14041458 node_str = 'node-{0}' .format (node_id )
@@ -1450,9 +1504,23 @@ def __init__(self, metrics, metric_group_prefix, node_id):
14501504 'The maximum request latency in ms.' ),
14511505 Max ())
14521506
1507+ throttle_time = metrics .sensor (
1508+ node_str + '.throttle' ,
1509+ parents = [metrics .get_sensor ('throttle-time' )])
1510+ throttle_time .add (metrics .metric_name (
1511+ 'throttle-time-avg' , metric_group_name ,
1512+ 'The average throttle time in ms.' ),
1513+ Avg ())
1514+ throttle_time .add (metrics .metric_name (
1515+ 'throttle-time-max' , metric_group_name ,
1516+ 'The maximum throttle time in ms.' ),
1517+ Max ())
1518+
1519+
14531520 self .bytes_sent = metrics .sensor (node_str + '.bytes-sent' )
14541521 self .bytes_received = metrics .sensor (node_str + '.bytes-received' )
14551522 self .request_time = metrics .sensor (node_str + '.latency' )
1523+ self .throttle_time = metrics .sensor (node_str + '.throttle' )
14561524
14571525
14581526def _address_family (address ):
0 commit comments