@@ -366,35 +366,46 @@ def connect_blocking(self, timeout=float('inf')):
366366
367367 def connect (self ):
368368 """Attempt to connect and return ConnectionState"""
369+ if self .config ["socks5_proxy" ] is not None and self ._socks5_proxy is None :
370+ self ._socks5_proxy = Socks5Wrapper (self .config ["socks5_proxy" ], self .afi )
371+
369372 if self .state is ConnectionStates .DISCONNECTED and not self .blacked_out ():
370373 self .state = ConnectionStates .CONNECTING
371374 self .last_attempt = time .time ()
372- next_lookup = self ._next_afi_sockaddr ()
373- if not next_lookup :
374- self .close (Errors .KafkaConnectionError ('DNS failure' ))
375- return self .state
376- else :
375+ if self .config ["socks5_proxy" ] is None or not self ._socks5_proxy .use_remote_lookup ():
376+ next_lookup = self ._next_afi_sockaddr ()
377+ if not next_lookup :
378+ self .close (Errors .KafkaConnectionError ('DNS failure' ))
379+ return self .state
380+
377381 log .debug ('%s: creating new socket' , self )
378382 assert self ._sock is None
379383 self ._sock_afi , self ._sock_addr = next_lookup
380384 try :
381385 if self .config ["socks5_proxy" ] is not None :
382- self ._socks5_proxy = Socks5Wrapper (self .config ["socks5_proxy" ], self .afi )
383386 self ._sock = self ._socks5_proxy .socket (self ._sock_afi , socket .SOCK_STREAM )
384387 else :
385388 self ._sock = socket .socket (self ._sock_afi , socket .SOCK_STREAM )
386389 except (socket .error , OSError ) as e :
387390 self .close (e )
388391 return self .state
392+ else :
393+ self ._sock = self ._socks5_proxy .socket (None , socket .SOCK_STREAM )
394+ self ._sock_afi = None
395+ self ._sock_addr = [self .host .encode ('ascii' ), self .port ]
389396
390397 for option in self .config ['socket_options' ]:
391398 log .debug ('%s: setting socket option %s' , self , option )
392399 self ._sock .setsockopt (* option )
393400
394401 self ._sock .setblocking (False )
395402 self .config ['state_change_callback' ](self .node_id , self ._sock , self )
403+ if self ._sock_afi != None :
404+ family_str = AFI_NAMES [self ._sock_afi ]
405+ else :
406+ family_str = "n.a."
396407 log .info ('%s: connecting to %s:%d [%s %s]' , self , self .host ,
397- self .port , self ._sock_addr , AFI_NAMES [ self . _sock_afi ] )
408+ self .port , self ._sock_addr , family_str )
398409
399410 if self .state is ConnectionStates .CONNECTING :
400411 # in non-blocking mode, use repeated calls to socket.connect_ex
@@ -862,7 +873,7 @@ def connection_delay(self):
862873 large number to handle slow/stalled connections.
863874 """
864875 if self .disconnected () or self .connecting ():
865- if len (self ._gai ) > 0 :
876+ if len (self ._gai ) > 0 or self . _socks5_proxy . use_remote_lookup () :
866877 return 0
867878 else :
868879 time_waited = time .time () - self .last_attempt
@@ -1291,9 +1302,13 @@ def check_version(self, timeout=2, **kwargs):
12911302 return self ._api_version
12921303
12931304 def __str__ (self ):
1305+ if self ._sock_afi != None :
1306+ family_str = AFI_NAMES [self ._sock_afi ]
1307+ else :
1308+ family_str = "n.a."
12941309 return "<BrokerConnection client_id=%s, node_id=%s host=%s:%d %s [%s %s]>" % (
12951310 self .config ['client_id' ], self .node_id , self .host , self .port , self .state ,
1296- AFI_NAMES [ self . _sock_afi ] , self ._sock_addr )
1311+ family_str , self ._sock_addr )
12971312
12981313
12991314class BrokerConnectionMetrics (object ):
0 commit comments