Skip to content

Commit 47a2be9

Browse files
committed
Support Metadata v8 in admin client
1 parent f1870ac commit 47a2be9

File tree

1 file changed

+49
-34
lines changed

1 file changed

+49
-34
lines changed

kafka/admin/client.py

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from kafka.vendor import six
1212

1313
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
14-
ACLResourcePatternType
14+
ACLResourcePatternType, valid_acl_operations
1515
from kafka.client_async import KafkaClient, selectors
1616
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0
1717
import kafka.errors as Errors
@@ -252,30 +252,32 @@ def _validate_timeout(self, timeout_ms):
252252

253253
def _refresh_controller_id(self, timeout_ms=30000):
254254
"""Determine the Kafka cluster controller."""
255-
version = self._client.api_version(MetadataRequest, max_version=6)
256-
if 1 <= version <= 6:
257-
timeout_at = time.time() + timeout_ms / 1000
258-
while time.time() < timeout_at:
259-
response = self.send_request(MetadataRequest[version]())
260-
controller_id = response.controller_id
261-
if controller_id == -1:
262-
log.warning("Controller ID not available, got -1")
263-
time.sleep(1)
264-
continue
265-
# verify the controller is new enough to support our requests
266-
controller_version = self._client.check_version(node_id=controller_id)
267-
if controller_version < (0, 10, 0):
268-
raise IncompatibleBrokerVersion(
269-
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
270-
.format(controller_version))
271-
self._controller_id = controller_id
272-
return
273-
else:
274-
raise Errors.NodeNotReadyError('controller')
275-
else:
255+
version = self._client.api_version(MetadataRequest, max_version=8)
256+
if version == 0:
276257
raise UnrecognizedBrokerVersion(
277258
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
278259
.format(version))
260+
# use defaults for allow_auto_topic_creation / include_authorized_operations in v6+
261+
request = MetadataRequest[version]()
262+
263+
timeout_at = time.time() + timeout_ms / 1000
264+
while time.time() < timeout_at:
265+
response = self.send_request(request)
266+
controller_id = response.controller_id
267+
if controller_id == -1:
268+
log.warning("Controller ID not available, got -1")
269+
time.sleep(1)
270+
continue
271+
# verify the controller is new enough to support our requests
272+
controller_version = self._client.check_version(node_id=controller_id)
273+
if controller_version < (0, 10, 0):
274+
raise IncompatibleBrokerVersion(
275+
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
276+
.format(controller_version))
277+
self._controller_id = controller_id
278+
return
279+
else:
280+
raise Errors.NodeNotReadyError('controller')
279281

280282
def _find_coordinator_id_request(self, group_id):
281283
"""Send a FindCoordinatorRequest to a broker.
@@ -540,11 +542,20 @@ def delete_topics(self, topics, timeout_ms=None):
540542
)
541543
)
542544

545+
def _process_metadata_response(self, metadata_response):
546+
obj = metadata_response.to_object()
547+
if 'authorized_operations' in obj:
548+
obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations'])))
549+
for t in obj['topics']:
550+
if 'authorized_operations' in t:
551+
t['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(t['authorized_operations'])))
552+
return obj
553+
543554
def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
544555
"""
545556
topics == None means "get all topics"
546557
"""
547-
version = self._client.api_version(MetadataRequest, max_version=5)
558+
version = self._client.api_version(MetadataRequest, max_version=8)
548559
if version <= 3:
549560
if auto_topic_creation:
550561
raise IncompatibleBrokerVersion(
@@ -553,13 +564,20 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
553564
.format(self.config['api_version']))
554565

555566
request = MetadataRequest[version](topics=topics)
556-
elif version <= 5:
567+
elif version <= 7:
557568
request = MetadataRequest[version](
558569
topics=topics,
559570
allow_auto_topic_creation=auto_topic_creation
560571
)
572+
else:
573+
request = MetadataRequest[version](
574+
topics=topics,
575+
allow_auto_topic_creation=auto_topic_creation,
576+
include_cluster_authorized_operations=True,
577+
include_topic_authorized_operations=True,
578+
)
561579

562-
return self.send_request(request)
580+
return self._process_metadata_response(self.send_request(request))
563581

564582
def list_topics(self):
565583
"""Retrieve a list of all topic names in the cluster.
@@ -568,8 +586,7 @@ def list_topics(self):
568586
A list of topic name strings.
569587
"""
570588
metadata = self._get_cluster_metadata(topics=None)
571-
obj = metadata.to_object()
572-
return [t['topic'] for t in obj['topics']]
589+
return [t['topic'] for t in metadata['topics']]
573590

574591
def describe_topics(self, topics=None):
575592
"""Fetch metadata for the specified topics or all topics if None.
@@ -582,8 +599,7 @@ def describe_topics(self, topics=None):
582599
A list of dicts describing each topic (including partition info).
583600
"""
584601
metadata = self._get_cluster_metadata(topics=topics)
585-
obj = metadata.to_object()
586-
return obj['topics']
602+
return metadata['topics']
587603

588604
def describe_cluster(self):
589605
"""
@@ -595,9 +611,8 @@ def describe_cluster(self):
595611
A dict with cluster-wide metadata, excluding topic details.
596612
"""
597613
metadata = self._get_cluster_metadata()
598-
obj = metadata.to_object()
599-
obj.pop('topics') # We have 'describe_topics' for this
600-
return obj
614+
metadata.pop('topics') # We have 'describe_topics' for this
615+
return metadata
601616

602617
@staticmethod
603618
def _convert_describe_acls_response_to_acls(describe_response):
@@ -1094,11 +1109,11 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
10941109
partitions = set(partitions)
10951110
topics = set(tp.topic for tp in partitions)
10961111

1097-
response = self._get_cluster_metadata(topics=topics).to_object()
1112+
metadata = self._get_cluster_metadata(topics=topics)
10981113

10991114
leader2partitions = defaultdict(list)
11001115
valid_partitions = set()
1101-
for topic in response.get("topics", ()):
1116+
for topic in metadata.get("topics", ()):
11021117
for partition in topic.get("partitions", ()):
11031118
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
11041119
if t2p in partitions:

0 commit comments

Comments
 (0)