Skip to content

Commit 9da1713

Browse files
committed
Support DescribeGroupsRequest v3 in admin client; always request authorized operations when available
1 parent 42681e6 commit 9da1713

File tree

1 file changed

+49
-57
lines changed

1 file changed

+49
-57
lines changed

kafka/admin/client.py

Lines changed: 49 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,7 +1214,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
12141214
# describe delegation_token protocol not yet implemented
12151215
# Note: send the request to the least_loaded_node()
12161216

1217-
def _describe_consumer_groups_request(self, group_id, include_authorized_operations=False):
1217+
def _describe_consumer_groups_request(self, group_id):
12181218
"""Send a DescribeGroupsRequest to the group's coordinator.
12191219
12201220
Arguments:
@@ -1225,74 +1225,69 @@ def _describe_consumer_groups_request(self, group_id, include_authorized_operati
12251225
"""
12261226
version = self._client.api_version(DescribeGroupsRequest, max_version=3)
12271227
if version <= 2:
1228-
if include_authorized_operations:
1229-
raise IncompatibleBrokerVersion(
1230-
"include_authorized_operations requests "
1231-
"DescribeGroupsRequest >= v3, which is not "
1232-
"supported by Kafka {}".format(version)
1233-
)
12341228
# Note: KAFKA-6788 A potential optimization is to group the
12351229
# request per coordinator and send one request with a list of
12361230
# all consumer groups. Java still hasn't implemented this
12371231
# because the error checking is hard to get right when some
12381232
# groups error and others don't.
12391233
request = DescribeGroupsRequest[version](groups=(group_id,))
1240-
elif version <= 3:
1234+
else:
12411235
request = DescribeGroupsRequest[version](
12421236
groups=(group_id,),
1243-
include_authorized_operations=include_authorized_operations
1237+
include_authorized_operations=True
12441238
)
12451239
return request
12461240

12471241
def _describe_consumer_groups_process_response(self, response):
12481242
"""Process a DescribeGroupsResponse into a group description."""
1249-
if response.API_VERSION <= 3:
1250-
assert len(response.groups) == 1
1251-
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
1252-
if isinstance(response_field, Array):
1253-
described_groups_field_schema = response_field.array_of
1254-
described_group = getattr(response, response_name)[0]
1255-
described_group_information_list = []
1256-
protocol_type_is_consumer = False
1257-
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1258-
if group_information_name == 'protocol_type':
1259-
protocol_type = described_group_information
1260-
protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type)
1261-
if isinstance(group_information_field, Array):
1262-
member_information_list = []
1263-
member_schema = group_information_field.array_of
1264-
for members in described_group_information:
1265-
member_information = []
1266-
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1267-
if protocol_type_is_consumer:
1268-
if member_name == 'member_metadata' and member:
1269-
member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member))
1270-
elif member_name == 'member_assignment' and member:
1271-
member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member))
1272-
else:
1273-
member_information.append(member)
1274-
member_info_tuple = MemberInformation._make(member_information)
1275-
member_information_list.append(member_info_tuple)
1276-
described_group_information_list.append(member_information_list)
1277-
else:
1278-
described_group_information_list.append(described_group_information)
1279-
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field.
1280-
# This will cause the namedtuple to fail.
1281-
# Therefore, appending a placeholder of None in it.
1282-
if response.API_VERSION <=2:
1283-
described_group_information_list.append(None)
1284-
group_description = GroupInformation._make(described_group_information_list)
1285-
error_code = group_description.error_code
1286-
error_type = Errors.for_code(error_code)
1287-
# Java has the note: KAFKA-6789, we can retry based on the error code
1288-
if error_type is not Errors.NoError:
1289-
raise error_type(
1290-
"DescribeGroupsResponse failed with response '{}'."
1291-
.format(response))
1292-
else:
1243+
if response.API_VERSION > 3:
12931244
raise NotImplementedError(
12941245
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
12951246
.format(response.API_VERSION))
1247+
1248+
assert len(response.groups) == 1
1249+
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
1250+
if isinstance(response_field, Array):
1251+
described_groups_field_schema = response_field.array_of
1252+
described_group = getattr(response, response_name)[0]
1253+
described_group_information_list = []
1254+
protocol_type_is_consumer = False
1255+
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
1256+
if group_information_name == 'protocol_type':
1257+
protocol_type = described_group_information
1258+
protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type)
1259+
if isinstance(group_information_field, Array):
1260+
member_information_list = []
1261+
member_schema = group_information_field.array_of
1262+
for members in described_group_information:
1263+
member_information = []
1264+
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
1265+
if protocol_type_is_consumer:
1266+
if member_name == 'member_metadata' and member:
1267+
member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member))
1268+
elif member_name == 'member_assignment' and member:
1269+
member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member))
1270+
else:
1271+
member_information.append(member)
1272+
member_info_tuple = MemberInformation._make(member_information)
1273+
member_information_list.append(member_info_tuple)
1274+
described_group_information_list.append(member_information_list)
1275+
else:
1276+
described_group_information_list.append(described_group_information)
1277+
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field.
1278+
if response.API_VERSION >= 3:
1279+
described_group_information_list[-1] = list(map(lambda acl: acl.name, valid_acl_operations(described_group_information_list[-1])))
1280+
else:
1281+
# TODO: Fix GroupInformation defaults
1282+
described_group_information_list.append([])
1283+
group_description = GroupInformation._make(described_group_information_list)
1284+
error_code = group_description.error_code
1285+
error_type = Errors.for_code(error_code)
1286+
# Java has the note: KAFKA-6789, we can retry based on the error code
1287+
if error_type is not Errors.NoError:
1288+
raise error_type(
1289+
"DescribeGroupsResponse failed with response '{}'."
1290+
.format(response))
12961291
return group_description
12971292

12981293
def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
@@ -1311,9 +1306,6 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
13111306
useful for avoiding extra network round trips if you already know
13121307
the group coordinator. This is only useful when all the group_ids
13131308
have the same coordinator, otherwise it will error. Default: None.
1314-
include_authorized_operations (bool, optional): Whether or not to include
1315-
information about the operations a group is allowed to perform.
1316-
Only supported on API version >= v3. Default: False.
13171309
13181310
Returns:
13191311
A list of group descriptions. For now the group descriptions
@@ -1327,7 +1319,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
13271319
groups_coordinators = self._find_coordinator_ids(group_ids)
13281320

13291321
requests = [
1330-
(self._describe_consumer_groups_request(group_id, include_authorized_operations), coordinator_id)
1322+
(self._describe_consumer_groups_request(group_id), coordinator_id)
13311323
for group_id, coordinator_id in groups_coordinators.items()
13321324
]
13331325
return self.send_requests(requests, response_fn=self._describe_consumer_groups_process_response)

0 commit comments

Comments
 (0)