@@ -236,6 +236,7 @@ def __init__(self, host, port, afi, **configs):
236236 self ._sock_afi = afi
237237 self ._sock_addr = None
238238 self ._api_versions = None
239+ self ._throttle_time = None
239240
240241 self .config = copy .copy (self .DEFAULT_CONFIG )
241242 for key in self .config :
@@ -851,6 +852,22 @@ def blacked_out(self):
851852 return self .connection_delay () > 0
852853 return False
853854
855+ def throttled (self ):
856+ """
857+ Return True if we are connected but currently throttled.
858+ """
859+ if self .state is ConnectionStates .CONNECTED :
860+ return self ._throttle_time is not None and self ._throttle_time > time .time ()
861+ return False
862+
863+ def throttle_delay (self ):
864+ """
865+ Return the number of milliseconds to wait until connection is no longer throttled.
866+ """
867+ if self ._throttle_time is not None :
868+ return max (0 , time .time () - self ._throttle_time ) * 1000
869+ return 0
870+
854871 def connection_delay (self ):
855872 """
856873 Return the number of milliseconds to wait, based on the connection
@@ -976,6 +993,8 @@ def send(self, request, blocking=True, request_timeout_ms=None):
976993 elif not self .connected ():
977994 return future .failure (Errors .KafkaConnectionError (str (self )))
978995 elif not self .can_send_more ():
996+ if self ._throttle_time :
997+ return future .failure (Errors .ThrottlingQuotaExceededError (str (self )))
979998 return future .failure (Errors .TooManyInFlightRequests (str (self )))
980999 return self ._send (request , blocking = blocking , request_timeout_ms = request_timeout_ms )
9811000
@@ -1063,8 +1082,28 @@ def send_pending_requests_v2(self):
10631082 self .close (error = error )
10641083 return False
10651084
1085+ def _maybe_throttle (self , response ):
1086+ throttle_time_ms = getattr (response , 'throttle_time_ms' , 0 )
1087+ if self ._sensors :
1088+ self ._sensors .throttle_time .record (throttle_time_ms )
1089+ if not throttle_time_ms :
1090+ if self ._throttle_time is not None :
1091+ self ._throttle_time = None
1092+ return
1093+ # Client side throttling enabled in v2.0 brokers
1094+ # prior to that throttling (if present) was managed broker-side
1095+ if not self .config ['api_version' ] or self .config ['api_version' ] >= (2 , 0 ):
1096+ throttle_time = time .time () + throttle_time_ms / 1000
1097+ self ._throttle_time = max (throttle_time , self ._throttle_time or 0 )
1098+ log .warning ("%s throttled by broker (%d ms)" , response .__name__ , throttle_time_ms )
1099+
10661100 def can_send_more (self ):
1067- """Return True unless there are max_in_flight_requests_per_connection."""
1101+ """Check for throttling / quota violations and max in-flight-requests"""
1102+ if self ._throttle_time is not None :
1103+ if self ._throttle_time > time .time ():
1104+ return False
1105+ # Reset throttle_time if needed
1106+ self ._throttle_time = None
10681107 max_ifrs = self .config ['max_in_flight_requests_per_connection' ]
10691108 return len (self .in_flight_requests ) < max_ifrs
10701109
@@ -1097,6 +1136,7 @@ def recv(self):
10971136 self ._sensors .request_time .record (latency_ms )
10981137
10991138 log .debug ('%s Response %d (%s ms): %s' , self , correlation_id , latency_ms , response )
1139+ self ._maybe_throttle (response )
11001140 responses [i ] = (response , future )
11011141
11021142 return responses
@@ -1399,6 +1439,16 @@ def __init__(self, metrics, metric_group_prefix, node_id):
13991439 'The maximum request latency in ms.' ),
14001440 Max ())
14011441
1442+ throttle_time = metrics .sensor ('throttle-time' )
1443+ throttle_time .add (metrics .metric_name (
1444+ 'throttle-time-avg' , metric_group_name ,
1445+ 'The average throttle time in ms.' ),
1446+ Avg ())
1447+ throttle_time .add (metrics .metric_name (
1448+ 'throttle-time-max' , metric_group_name ,
1449+ 'The maximum throttle time in ms.' ),
1450+ Max ())
1451+
14021452 # if one sensor of the metrics has been registered for the connection,
14031453 # then all other sensors should have been registered; and vice versa
14041454 node_str = 'node-{0}' .format (node_id )
@@ -1450,9 +1500,23 @@ def __init__(self, metrics, metric_group_prefix, node_id):
14501500 'The maximum request latency in ms.' ),
14511501 Max ())
14521502
1503+ throttle_time = metrics .sensor (
1504+ node_str + '.throttle' ,
1505+ parents = [metrics .get_sensor ('throttle-time' )])
1506+ throttle_time .add (metrics .metric_name (
1507+ 'throttle-time-avg' , metric_group_name ,
1508+ 'The average throttle time in ms.' ),
1509+ Avg ())
1510+ throttle_time .add (metrics .metric_name (
1511+ 'throttle-time-max' , metric_group_name ,
1512+ 'The maximum throttle time in ms.' ),
1513+ Max ())
1514+
1515+
14531516 self .bytes_sent = metrics .sensor (node_str + '.bytes-sent' )
14541517 self .bytes_received = metrics .sensor (node_str + '.bytes-received' )
14551518 self .request_time = metrics .sensor (node_str + '.latency' )
1519+ self .throttle_time = metrics .sensor (node_str + '.throttle' )
14561520
14571521
14581522def _address_family (address ):
0 commit comments