diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index c80197c3a5a1a..dead53dce118e 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1213,12 +1213,9 @@ def create_topic(self, topic_cfg, node=None): self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or\ - (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server()) - cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % { - 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), + 'kafka_topics_cmd': self.kafka_topics_cmd_with_optional_security_settings(DEV_BRANCH,force_use_zk_connection=False), 'topic': topic_cfg.get("topic"), } if 'replica-assignment' in topic_cfg: @@ -1252,11 +1249,9 @@ def delete_topic(self, topic, node=None): node = self.nodes[0] self.logger.info("Deleting topic %s" % topic) - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --delete" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) + (self.kafka_topics_cmd_with_optional_security_settings(DEV_BRANCH, force_use_zk_connection=False), topic) self.logger.info("Running topic delete command...\n%s" % cmd) node.account.ssh(cmd) @@ -1287,11 +1282,10 @@ def describe_under_replicated_partitions(self): """ node = self.nodes[0] - force_use_zk_connection = not node.version.topic_command_supports_bootstrap_server() cmd = fix_opts_for_new_jvm(node) cmd += "%s --describe --under-replicated-partitions" % \ - self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection) + self.kafka_topics_cmd_with_optional_security_settings(DEV_BRANCH, force_use_zk_connection=False) self.logger.debug("Running topic command to describe under-replicated partitions\n%s" % cmd) output = "" @@ -1307,11 +1301,10 @@ def describe_topic(self, topic, node=None, offline_nodes=[]): if node is None: node = self.nodes[0] - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection, offline_nodes=offline_nodes), topic) + (self.kafka_topics_cmd_with_optional_security_settings(DEV_BRANCH, force_use_zk_connection=False, offline_nodes=offline_nodes), topic) self.logger.info("Running topic describe command...\n%s" % cmd) output = "" @@ -1323,10 +1316,8 @@ def list_topics(self, node=None): if node is None: node = self.nodes[0] - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) - cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection)) + cmd += "%s --list" % (self.kafka_topics_cmd_with_optional_security_settings(DEV_BRANCH, force_use_zk_connection=False)) for line in node.account.ssh_capture(cmd): if not line.startswith("SLF4J"): yield line.rstrip() @@ -1674,11 +1665,9 @@ def topic_id(self, topic): if self.all_nodes_support_topic_ids(): node = self.nodes[0] - force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() - cmd = fix_opts_for_new_jvm(node) cmd += "%s --topic %s --describe" % \ - (self.kafka_topics_cmd_with_optional_security_settings(node, force_use_zk_connection), topic) + (self.kafka_topics_cmd_with_optional_security_settings(DEV_BRANCH, force_use_zk_connection=False), topic) self.logger.debug( "Querying topic ID by using describe topic command ...\n%s" % cmd