Skip to content

Commit 691933c

Browse files
authored
Update Docker Compose and AdminClient to support new Kafka version and consumer group types (#63)
1 parent 59f2689 commit 691933c

File tree

4 files changed

+38
-35
lines changed

4 files changed

+38
-35
lines changed

examples/docker-compose.yml

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,62 @@
1-
version: '3.6'
1+
version: '3.8'
22
services:
3-
zookeeper:
4-
image: confluentinc/cp-zookeeper:7.3.3
5-
hostname: zookeeper
6-
container_name: zookeeper
7-
ports:
8-
- '2181:2181'
9-
environment:
10-
ZOOKEEPER_CLIENT_PORT: 2181
11-
healthcheck:
12-
test: ['CMD-SHELL', 'nc -zv localhost 2181 && exit 0 || exit 1']
13-
143
broker:
15-
image: confluentinc/cp-enterprise-kafka:7.3.3
4+
image: confluentinc/cp-server:7.8.0
165
hostname: broker
176
container_name: broker
18-
depends_on:
19-
- zookeeper
207
ports:
218
- '9092:9092'
229
- '29092:29092'
2310
healthcheck:
24-
test: ['CMD-SHELL', 'nc -zv localhost 9092 && exit 0 || exit 1']
11+
test: ["CMD-SHELL", "nc -z localhost 9092"]
12+
interval: 10s
13+
timeout: 5s
14+
retries: 5
2515
environment:
26-
KAFKA_BROKER_ID: 1
27-
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
28-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
16+
KAFKA_NODE_ID: 1
17+
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
18+
KAFKA_PROCESS_ROLES: 'broker,controller'
19+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:9093'
20+
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:9093,PLAINTEXT_HOST://0.0.0.0:9092'
2921
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
22+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
23+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
3024
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
3125
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
3226

3327
schema-registry:
34-
image: confluentinc/cp-schema-registry:7.3.3
28+
image: confluentinc/cp-schema-registry:7.8.0
3529
hostname: schema-registry
3630
container_name: schema-registry
3731
depends_on:
38-
- zookeeper
39-
- broker
32+
broker:
33+
condition: service_healthy
4034
ports:
4135
- '8081:8081'
4236
healthcheck:
43-
test: ['CMD-SHELL', 'nc -zv localhost 8081 && exit 0 || exit 1']
37+
test: ["CMD-SHELL", "nc -z localhost 8081"]
38+
interval: 10s
39+
timeout: 5s
40+
retries: 5
4441
environment:
4542
SCHEMA_REGISTRY_HOST_NAME: schema-registry
4643
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
4744
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
48-
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
4945

5046
rest-proxy:
51-
image: confluentinc/cp-kafka-rest:7.3.3
47+
image: confluentinc/cp-kafka-rest:7.8.0
5248
depends_on:
53-
- zookeeper
54-
- broker
55-
- schema-registry
49+
broker:
50+
condition: service_healthy
51+
schema-registry:
52+
condition: service_healthy
5653
ports:
5754
- 8082:8082
5855
healthcheck:
59-
test: ['CMD-SHELL', 'nc -zv localhost 8082 && exit 0 || exit 1']
56+
test: ["CMD-SHELL", "nc -z localhost 8082"]
57+
interval: 10s
58+
timeout: 5s
59+
retries: 5
6060
hostname: rest-proxy
6161
container_name: rest-proxy
6262
environment:

examples/test_adminclient.robot

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,16 @@ AdminClient List Consumer Groups
3232

3333
${admin_client_id}= Create Admin Client
3434
${states}= Create List ${CONSUMER_GROUP_STATE_STABLE}
35-
${groups}= List Groups ${admin_client_id} states=${states}
35+
${types}= Create List ${CONSUMER_GROUP_TYPE_CLASSIC}
36+
${groups}= List Groups ${admin_client_id} states=${states} types=${types}
3637
Log ${groups}
3738
Log ${groups.valid}
3839
FOR ${group} IN @{groups.valid}
3940
Log ${group.group_id}
4041
IF "${group_id}" == "${group.group_id}"
4142
Log ${group.group_id}
4243
Log ${group.state}
44+
Log ${group.type}
4345
Pass Execution "Consumer found in list"
4446
END
4547
END

src/ConfluentKafkaLibrary/admin_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def create_admin_client(
2525
self.admin_clients[group_id] = admin_client
2626
return group_id
2727

28-
def list_groups(self, group_id, states=None, request_timeout=10):
28+
def list_groups(self, group_id, states=None, types=None, request_timeout=10):
2929
"""List consumer groups.
3030
- ``states`` (list(ConsumerGroupState)): filter consumer groups which are currently in these states.
3131
For example usage see 'AdminClient List Consumer Groups' at
@@ -34,9 +34,10 @@ def list_groups(self, group_id, states=None, request_timeout=10):
3434
- ``request_timeout`` (int): Maximum response time before timing out.
3535
Default: `10`.
3636
"""
37-
if states is None:
38-
states = []
39-
future = self.admin_clients[group_id].list_consumer_groups(request_timeout=request_timeout, states=set(states))
37+
states = states or []
38+
types = types or []
39+
40+
future = self.admin_clients[group_id].list_consumer_groups(request_timeout=request_timeout, states=set(states), types=set(types))
4041
return future.result()
4142

4243
def describe_groups(self, group_id, group_ids, request_timeout=10, **kwargs):
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = '2.10.0.post1'
1+
VERSION = '2.10.0.post2'

0 commit comments

Comments
 (0)