@@ -399,13 +399,23 @@ def _should_recycle_connection(self, conn):
399399 return False
400400
401401 def _maybe_connect (self , node_id ):
402- """Idempotent non-blocking connection attempt to the given node id."""
402+ """Idempotent non-blocking connection attempt to the given node id.
403+
404+ Returns True if connection object exists and is connected / connecting
405+ """
403406 with self ._lock :
404407 conn = self ._conns .get (node_id )
405408
409+ # Check if existing connection should be recreated because host/port changed
410+ if conn is not None and self ._should_recycle_connection (conn ):
411+ self ._conns .pop (node_id ).close ()
412+ conn = None
413+
406414 if conn is None :
407415 broker = self .cluster .broker_metadata (node_id )
408- assert broker , 'Broker id %s not in current metadata' % (node_id ,)
416+ if broker is None :
417+ log .debug ('Broker id %s not in current metadata' , node_id )
418+ return False
409419
410420 log .debug ("Initiating connection to node %s at %s:%s" ,
411421 node_id , broker .host , broker .port )
@@ -417,16 +427,11 @@ def _maybe_connect(self, node_id):
417427 ** self .config )
418428 self ._conns [node_id ] = conn
419429
420- # Check if existing connection should be recreated because host/port changed
421- elif self ._should_recycle_connection (conn ):
422- self ._conns .pop (node_id )
423- return False
424-
425430 elif conn .connected ():
426431 return True
427432
428433 conn .connect ()
429- return conn .connected ()
434+ return not conn .disconnected ()
430435
431436 def ready (self , node_id , metadata_priority = True ):
432437 """Check whether a node is connected and ok to send more requests.
@@ -621,7 +626,10 @@ def poll(self, timeout_ms=None, future=None):
621626
622627 # Attempt to complete pending connections
623628 for node_id in list (self ._connecting ):
624- self ._maybe_connect (node_id )
629+ # False return means no more connection progress is possible
630+ # Connected nodes will update _connecting via state_change callback
631+ if not self ._maybe_connect (node_id ):
632+ self ._connecting .remove (node_id )
625633
626634 # If we got a future that is already done, don't block in _poll
627635 if future is not None and future .is_done :
@@ -965,7 +973,12 @@ def check_version(self, node_id=None, timeout=None, strict=False):
965973 if try_node is None :
966974 self ._lock .release ()
967975 raise Errors .NoBrokersAvailable ()
968- self ._maybe_connect (try_node )
976+ if not self ._maybe_connect (try_node ):
977+ if try_node == node_id :
978+ raise Errors .NodeNotReadyError ("Connection failed to %s" % node_id )
979+ else :
980+ continue
981+
969982 conn = self ._conns [try_node ]
970983
971984 # We will intentionally cause socket failures
0 commit comments