Skip to content

Commit 552c5d0

Browse files
committed
gc
1 parent 87dd5a1 commit 552c5d0

File tree

1 file changed

+48
-5
lines changed

1 file changed

+48
-5
lines changed

src/confluent_kafka/admin/__init__.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ def _make_topics_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
135135
Map per-topic results to per-topic futures in futmap.
136136
The result value of each (successful) future is None.
137137
"""
138+
import gc
139+
gc_was_enabled = gc.isenabled()
140+
gc.disable()
138141
try:
139142
result = f.result()
140143
for topic, error in result.items():
@@ -152,6 +155,9 @@ def _make_topics_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
152155
# Request-level exception, raise the same for all topics
153156
for topic, fut in futmap.items():
154157
fut.set_exception(e)
158+
finally:
159+
if gc_was_enabled:
160+
gc.enable()
155161

156162
@staticmethod
157163
def _make_resource_result(f: concurrent.futures.Future,
@@ -160,6 +166,9 @@ def _make_resource_result(f: concurrent.futures.Future,
160166
Map per-resource results to per-resource futures in futmap.
161167
The result value of each (successful) future is a ConfigResource.
162168
"""
169+
import gc
170+
gc_was_enabled = gc.isenabled()
171+
gc.disable()
163172
try:
164173
result = f.result()
165174
for resource, configs in result.items():
@@ -178,6 +187,9 @@ def _make_resource_result(f: concurrent.futures.Future,
178187
# Request-level exception, raise the same for all resources
179188
for resource, fut in futmap.items():
180189
fut.set_exception(e)
190+
finally:
191+
if gc_was_enabled:
192+
gc.enable()
181193

182194
@staticmethod
183195
def _make_list_consumer_groups_result(f: concurrent.futures.Future, futmap: Any) -> None:
@@ -189,8 +201,10 @@ def _make_consumer_groups_result(f: concurrent.futures.Future,
189201
"""
190202
Map per-group results to per-group futures in futmap.
191203
"""
204+
import gc
205+
gc_was_enabled = gc.isenabled()
206+
gc.disable()
192207
try:
193-
194208
results = f.result()
195209
futmap_values = list(futmap.values())
196210
len_results = len(results)
@@ -208,6 +222,9 @@ def _make_consumer_groups_result(f: concurrent.futures.Future,
208222
# Request-level exception, raise the same for all groups
209223
for _, fut in futmap.items():
210224
fut.set_exception(e)
225+
finally:
226+
if gc_was_enabled:
227+
gc.enable()
211228

212229
@staticmethod
213230
def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
@@ -216,8 +233,15 @@ def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
216233
Map per-group results to per-group futures in futmap.
217234
The result value of each (successful) future is ConsumerGroupTopicPartitions.
218235
"""
219-
try:
236+
import gc
220237

238+
# Disable GC during callback to prevent AdminClient destruction from librdkafka thread.
239+
# This callback runs in librdkafka's background thread, and if GC runs here, it may
240+
# try to destroy AdminClient objects, which librdkafka forbids from its own threads.
241+
gc_was_enabled = gc.isenabled()
242+
gc.disable()
243+
244+
try:
221245
results = f.result()
222246
futmap_values = list(futmap.values())
223247
len_results = len(results)
@@ -235,6 +259,10 @@ def _make_consumer_group_offsets_result(f: concurrent.futures.Future,
235259
# Request-level exception, raise the same for all groups
236260
for _, fut in futmap.items():
237261
fut.set_exception(e)
262+
finally:
263+
# Re-enable GC if it was enabled before
264+
if gc_was_enabled:
265+
gc.enable()
238266

239267
@staticmethod
240268
def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent.futures.Future]) -> None:
@@ -243,6 +271,9 @@ def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent
243271
For create_acls the result value of each (successful) future is None.
244272
For delete_acls the result value of each (successful) future is the list of deleted AclBindings.
245273
"""
274+
import gc
275+
gc_was_enabled = gc.isenabled()
276+
gc.disable()
246277
try:
247278
results = f.result()
248279
futmap_values = list(futmap.values())
@@ -261,12 +292,17 @@ def _make_acls_result(f: concurrent.futures.Future, futmap: Dict[Any, concurrent
261292
# Request-level exception, raise the same for all the AclBindings or AclBindingFilters
262293
for resource, fut in futmap.items():
263294
fut.set_exception(e)
295+
finally:
296+
if gc_was_enabled:
297+
gc.enable()
264298

265299
@staticmethod
266300
def _make_futmap_result_from_list(f: concurrent.futures.Future,
267301
futmap: Dict[Any, concurrent.futures.Future]) -> None:
302+
import gc
303+
gc_was_enabled = gc.isenabled()
304+
gc.disable()
268305
try:
269-
270306
results = f.result()
271307
futmap_values = list(futmap.values())
272308
len_results = len(results)
@@ -284,9 +320,15 @@ def _make_futmap_result_from_list(f: concurrent.futures.Future,
284320
# Request-level exception, raise the same for all topics
285321
for _, fut in futmap.items():
286322
fut.set_exception(e)
323+
finally:
324+
if gc_was_enabled:
325+
gc.enable()
287326

288327
@staticmethod
289328
def _make_futmap_result(f: concurrent.futures.Future, futmap: Dict[str, concurrent.futures.Future]) -> None:
329+
import gc
330+
gc_was_enabled = gc.isenabled()
331+
gc.disable()
290332
try:
291333
results = f.result()
292334
len_results = len(results)
@@ -306,6 +348,9 @@ def _make_futmap_result(f: concurrent.futures.Future, futmap: Dict[str, concurre
306348
except Exception as e:
307349
for _, fut in futmap.items():
308350
fut.set_exception(e)
351+
finally:
352+
if gc_was_enabled:
353+
gc.enable()
309354

310355
@staticmethod
311356
def _create_future() -> concurrent.futures.Future:
@@ -1126,7 +1171,6 @@ def list_consumer_group_offsets( # type: ignore[override]
11261171
:raises TypeException: Invalid input.
11271172
:raises ValueException: Invalid input.
11281173
"""
1129-
11301174
AdminClient._check_list_consumer_group_offsets_request(list_consumer_group_offsets_request)
11311175

11321176
f, futmap = AdminClient._make_futures(
@@ -1163,7 +1207,6 @@ def alter_consumer_group_offsets( # type: ignore[override]
11631207
:raises TypeException: Invalid input.
11641208
:raises ValueException: Invalid input.
11651209
"""
1166-
11671210
AdminClient._check_alter_consumer_group_offsets_request(alter_consumer_group_offsets_request)
11681211

11691212
f, futmap = AdminClient._make_futures([request.group_id for request in alter_consumer_group_offsets_request],

0 commit comments

Comments
 (0)