@@ -204,8 +204,9 @@ def __init__(self, **configs):
204204 # these properties need to be set on top of the initialization pipeline
205205 # because they are used when __del__ method is called
206206 self ._closed = False
207- self ._wake_r , self ._wake_w = socket .socketpair ()
208207 self ._selector = self .config ['selector' ]()
208+ self ._init_wakeup_socketpair ()
209+ self ._wake_lock = threading .Lock ()
209210
210211 self .cluster = ClusterMetadata (** self .config )
211212 self ._topics = set () # empty set will fetch all topic metadata
@@ -217,9 +218,6 @@ def __init__(self, **configs):
217218 self ._refresh_on_disconnects = True
218219 self ._last_bootstrap = 0
219220 self ._bootstrap_fails = 0
220- self ._wake_r .setblocking (False )
221- self ._wake_w .settimeout (self .config ['wakeup_timeout_ms' ] / 1000.0 )
222- self ._wake_lock = threading .Lock ()
223221
224222 self ._lock = threading .RLock ()
225223
@@ -228,7 +226,6 @@ def __init__(self, **configs):
228226 # lock above.
229227 self ._pending_completion = collections .deque ()
230228
231- self ._selector .register (self ._wake_r , selectors .EVENT_READ )
232229 self ._idle_expiry_manager = IdleConnectionManager (self .config ['connections_max_idle_ms' ])
233230 self ._sensors = None
234231 if self .config ['metrics' ]:
@@ -243,6 +240,25 @@ def __init__(self, **configs):
243240 check_timeout = self .config ['api_version_auto_timeout_ms' ] / 1000
244241 self .config ['api_version' ] = self .check_version (timeout = check_timeout )
245242
243+ def _init_wakeup_socketpair (self ):
244+ self ._wake_r , self ._wake_w = socket .socketpair ()
245+ self ._wake_r .setblocking (False )
246+ self ._wake_w .settimeout (self .config ['wakeup_timeout_ms' ] / 1000.0 )
247+ self ._waking = False
248+ self ._selector .register (self ._wake_r , selectors .EVENT_READ )
249+
250+ def _close_wakeup_socketpair (self ):
251+ if self ._wake_r is not None :
252+ try :
253+ self ._selector .unregister (self ._wake_r )
254+ except KeyError :
255+ pass
256+ self ._wake_r .close ()
257+ if self ._wake_w is not None :
258+ self ._wake_w .close ()
259+ self ._wake_r = None
260+ self ._wake_w = None
261+
246262 def _can_bootstrap (self ):
247263 effective_failures = self ._bootstrap_fails // self ._num_bootstrap_hosts
248264 backoff_factor = 2 ** effective_failures
@@ -416,9 +432,8 @@ def connected(self, node_id):
416432 def _close (self ):
417433 if not self ._closed :
418434 self ._closed = True
419- self ._wake_r .close ()
420- self ._wake_w .close ()
421435 self ._selector .close ()
436+ self ._close_wakeup_socketpair ()
422437
423438 def close (self , node_id = None ):
424439 """Close one or all broker connections.
@@ -944,22 +959,34 @@ def check_version(self, node_id=None, timeout=2, strict=False):
944959 raise Errors .NoBrokersAvailable ()
945960
946961 def wakeup (self ):
962+ if self ._waking or self ._wake_w is None :
963+ return
947964 with self ._wake_lock :
948965 try :
949966 self ._wake_w .sendall (b'x' )
950- except socket .timeout :
967+ self ._waking = True
968+ except socket .timeout as e :
951969 log .warning ('Timeout to send to wakeup socket!' )
952- raise Errors .KafkaTimeoutError ()
953- except socket .error :
954- log .warning ('Unable to send to wakeup socket!' )
970+ raise Errors .KafkaTimeoutError (e )
971+ except socket .error as e :
972+ log .warning ('Unable to send to wakeup socket! %s' , e )
973+ raise e
955974
956975 def _clear_wake_fd (self ):
957976 # reading from wake socket should only happen in a single thread
958- while True :
959- try :
960- self ._wake_r .recv (1024 )
961- except socket .error :
962- break
977+ with self ._wake_lock :
978+ self ._waking = False
979+ while True :
980+ try :
981+ if not self ._wake_r .recv (1024 ):
982+ # Non-blocking socket returns empty on error
983+ log .warning ("Error reading wakeup socket. Rebuilding socketpair." )
984+ self ._close_wakeup_socketpair ()
985+ self ._init_wakeup_socketpair ()
986+ break
987+ except socket .error :
988+ # Non-blocking socket raises when socket is ok but no data available to read
989+ break
963990
964991 def _maybe_close_oldest_connection (self ):
965992 expired_connection = self ._idle_expiry_manager .poll_expired_connection ()
0 commit comments