@@ -341,13 +341,13 @@ def _jaas_config(self):
341341
342342 elif self .sasl_mechanism == 'PLAIN' :
343343 jaas_config = (
344- 'org.apache.kafka.common.security.plain.PlainLoginModule required\n '
345- ' username="{user}" password="{password}" user_{user}="{password}";\n '
344+ 'org.apache.kafka.common.security.plain.PlainLoginModule required'
345+ ' username="{user}" password="{password}" user_{user}="{password}";\n '
346346 )
347347 elif self .sasl_mechanism in ("SCRAM-SHA-256" , "SCRAM-SHA-512" ):
348348 jaas_config = (
349- 'org.apache.kafka.common.security.scram.ScramLoginModule required\n '
350- ' username="{user}" password="{password}";\n '
349+ 'org.apache.kafka.common.security.scram.ScramLoginModule required'
350+ ' username="{user}" password="{password}";\n '
351351 )
352352 else :
353353 raise ValueError ("SASL mechanism {} currently not supported" .format (self .sasl_mechanism ))
@@ -605,7 +605,8 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
605605 if num_partitions is None else num_partitions ,
606606 '--replication-factor' , self .replicas \
607607 if replication_factor is None \
608- else replication_factor )
608+ else replication_factor ,
609+ * self ._cli_connect_args ())
609610 if env_kafka_version () >= (0 , 10 ):
610611 args .append ('--if-not-exists' )
611612 env = self .kafka_run_class_env ()
@@ -618,16 +619,23 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
618619 self .out (stderr )
619620 raise RuntimeError ("Failed to create topic %s" % (topic_name ,))
620621
622+ def _cli_connect_args (self ):
623+ if env_kafka_version () < (3 , 0 , 0 ):
624+ return ['--zookeeper' , '%s:%s/%s' % (self .zookeeper .host , self .zookeeper .port , self .zk_chroot )]
625+ else :
626+ args = ['--bootstrap-server' , '%s:%s' % (self .host , self .port )]
627+ if self .sasl_enabled :
628+ command_conf = self .tmp_dir .join ("sasl_command.conf" )
629+ self .render_template (self .test_resource ("sasl_command.conf" ), command_conf , vars (self ))
630+ args .append ('--command-config' )
631+ args .append (command_conf .strpath )
632+ return args
633+
621634 def get_topic_names (self ):
622- args = self .run_script ('kafka-topics.sh' ,
623- '--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
624- self .zookeeper .port ,
625- self .zk_chroot ),
626- '--list'
627- )
635+ cmd = self .run_script ('kafka-topics.sh' , '--list' , * self ._cli_connect_args ())
628636 env = self .kafka_run_class_env ()
629637 env .pop ('KAFKA_LOG4J_OPTS' )
630- proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
638+ proc = subprocess .Popen (cmd , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
631639 stdout , stderr = proc .communicate ()
632640 if proc .returncode != 0 :
633641 self .out ("Failed to list topics!" )
0 commit comments