Skip to content

Commit d0b5500

Browse files
peter-luciaMSeal
andauthored
Add close() to producer (#2039)
* Add close() to producer * Updated Producer.c * Update tests, log warning * Updated logging * WIP * Updated Producer.c * Updated tests * Restore pyproject.toml * Cleanup * Delete uv.lock * Updated test_Producer.py * Updated Producer.c and test_Producer.py * Updated test_Producer.py * Added flush call within producer close * Test commit * Adjusted to reuse exit logic * Removing extra unused defines * Resolved linting fixes --------- Co-authored-by: Matthew Seal <mseal@confluent.io>
1 parent f7249c1 commit d0b5500

File tree

3 files changed

+87
-23
lines changed

3 files changed

+87
-23
lines changed

DEVELOPER.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,24 @@ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
5555

5656
```bash
5757
python3 -c "import confluent_kafka; print('Setup successful!')"
58-
```
58+
59+
#### Local Setup with UV
60+
61+
Alternative setup instructions tested with python 3.11
62+
63+
```bash
64+
# Modify pyproject.toml to require python version >=3.11
65+
# This fixes the cel-python dependency conflict
66+
uv venv --python 3.11
67+
source .venv/bin/activate
68+
69+
uv sync --extra dev --extra tests
70+
uv pip install trivup setuptools
71+
pytest tests/
72+
73+
# When making changes, change project.version in pyproject.toml before re-running:
74+
uv sync --extra dev --extra tests
75+
```
5976

6077
<!-- markdownlint-enable MD029 -->
6178

src/confluent_kafka/src/Producer.c

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,37 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
420420
return cfl_PyInt_FromInt(qlen);
421421
}
422422

423+
424+
static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) {
425+
rd_kafka_resp_err_t err;
426+
CallState cs;
427+
428+
if (!self->rk)
429+
Py_RETURN_TRUE;
430+
431+
CallState_begin(self, &cs);
432+
433+
/* Flush any pending messages (wait indefinitely to ensure delivery) */
434+
err = rd_kafka_flush(self->rk, -1);
435+
436+
/* Destroy the producer (even if flush had issues) */
437+
rd_kafka_destroy(self->rk);
438+
self->rk = NULL;
439+
440+
if (!CallState_end(self, &cs))
441+
return NULL;
442+
443+
/* If flush failed, warn but don't suppress original exception */
444+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
445+
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
446+
"Producer flush failed during close: %s",
447+
rd_kafka_err2str(err));
448+
}
449+
450+
Py_RETURN_TRUE;
451+
}
452+
453+
423454
/**
424455
* @brief Validate arguments and parse all messages in the batch
425456
* @param self Producer handle
@@ -906,27 +937,7 @@ static PyObject *Producer_exit (Handle *self, PyObject *args) {
906937
&exc_type, &exc_value, &exc_traceback))
907938
return NULL;
908939

909-
/* Cleanup: flush pending messages and destroy producer */
910-
if (self->rk) {
911-
CallState_begin(self, &cs);
912-
913-
/* Flush any pending messages (wait indefinitely to ensure delivery) */
914-
err = rd_kafka_flush(self->rk, -1);
915-
916-
/* Destroy the producer (even if flush had issues) */
917-
rd_kafka_destroy(self->rk);
918-
self->rk = NULL;
919-
920-
if (!CallState_end(self, &cs))
921-
return NULL;
922-
923-
/* If flush failed, warn but don't suppress original exception */
924-
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
925-
PyErr_WarnFormat(PyExc_RuntimeWarning, 1,
926-
"Producer flush failed during context exit: %s",
927-
rd_kafka_err2str(err));
928-
}
929-
}
940+
Producer_close(self, (PyObject *)NULL, (PyObject *)NULL);
930941

931942
/* Return None to propagate any exceptions from the with block */
932943
Py_RETURN_NONE;
@@ -983,7 +994,15 @@ static PyMethodDef Producer_methods[] = {
983994
" :rtype: int\n"
984995
"\n"
985996
},
986-
997+
{ "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS,
998+
".. py:function:: close()\n"
999+
"\n"
1000+
" Request to close the producer on demand.\n"
1001+
"\n"
1002+
" :rtype: bool\n"
1003+
" :returns: True if producer close requested successfully, False otherwise\n"
1004+
"\n"
1005+
},
9871006
{ "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS,
9881007
".. py:function:: flush([timeout])\n"
9891008
"\n"

tests/test_Producer.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ def on_delivery(err, msg):
5555
except KafkaException as e:
5656
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT)
5757

58+
assert p.close(), "Failed to validate that producer was closed."
59+
5860

5961
def test_produce_timestamp():
6062
""" Test produce() with timestamp arg """
@@ -239,6 +241,8 @@ def test_transaction_api():
239241
assert ex.value.args[0].fatal() is False
240242
assert ex.value.args[0].txn_requires_abort() is False
241243

244+
assert p.close(), "The producer was not closed"
245+
242246

243247
def test_purge():
244248
"""
@@ -274,6 +278,8 @@ def on_delivery(err, msg):
274278
p.flush(0.002)
275279
assert cb_detector["on_delivery_called"]
276280

281+
assert p.close(), "The producer was not closed"
282+
277283

278284
def test_producer_bool_value():
279285
"""
@@ -283,6 +289,7 @@ def test_producer_bool_value():
283289

284290
p = Producer({})
285291
assert bool(p)
292+
assert p.close(), "The producer was not fully closed"
286293

287294

288295
def test_produce_batch_basic_types_and_data():
@@ -1395,3 +1402,24 @@ def __init__(self, config):
13951402

13961403
# Test __len__() - should return 0 for closed producer (safe, no crash)
13971404
assert len(producer) == 0
1405+
1406+
1407+
def test_producer_close():
1408+
"""
1409+
Ensures the producer close can be requested on demand
1410+
"""
1411+
conf = {
1412+
'debug': 'all',
1413+
'socket.timeout.ms': 10,
1414+
'error_cb': error_cb,
1415+
'message.timeout.ms': 10
1416+
}
1417+
producer = Producer(conf)
1418+
cb_detector = {"on_delivery_called": False}
1419+
1420+
def on_delivery(err, msg):
1421+
cb_detector["on_delivery_called"] = True
1422+
1423+
producer.produce('mytopic', value='somedata', key='a key', callback=on_delivery)
1424+
assert producer.close(), "The producer could not be closed on demand"
1425+
assert cb_detector["on_delivery_called"], "The delivery callback should have been called by flushing during close"

0 commit comments

Comments
 (0)