Skip to content

Commit 07b68df

Browse files
committed
Added flush call within producer close
1 parent 51de10f commit 07b68df

File tree

2 files changed

+26
-15
lines changed

2 files changed

+26
-15
lines changed

src/confluent_kafka/src/Producer.c

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -423,26 +423,21 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
423423

424424
static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) {
425425

426+
double tmout = 1; // Default timeout is 1 second for close to clear rather than indefinitely
427+
static char *kws[] = { "timeout", NULL };
428+
rd_kafka_resp_err_t err;
426429
CallState cs;
427430

431+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
432+
return NULL;
433+
428434
if (!self->rk)
429435
Py_RETURN_TRUE;
430436

431437
CallState_begin(self, &cs);
432438

433-
/* Warn if there are pending messages */
434-
int outq_len = rd_kafka_outq_len(self->rk);
435-
if (outq_len > 0) {
436-
const char msg[150];
437-
sprintf(msg, "There are %d message(s) still in producer queue! "
438-
"Use flush() or wait for delivery.", outq_len);
439-
rd_kafka_log_print(
440-
self->rk,
441-
CK_LOG_WARNING,
442-
"CLOSWARN",
443-
msg
444-
);
445-
}
439+
// Flush any remaining messages before closing if possible
440+
err = rd_kafka_flush(self->rk, cfl_timeout_ms(tmout));
446441
rd_kafka_destroy(self->rk);
447442
rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested");
448443

@@ -1020,10 +1015,11 @@ static PyMethodDef Producer_methods[] = {
10201015
"\n"
10211016
},
10221017
{ "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS,
1023-
".. py:function:: close()\n"
1018+
".. py:function:: close([timeout])\n"
10241019
"\n"
1025-
" Request to close the producer on demand.\n"
1020+
" Request to close the producer on demand with an optional timeout.\n"
10261021
"\n"
1022+
" :param: float timeout: Maximum time to block (default 1 second). (Seconds)\n"
10271023
" :rtype: bool\n"
10281024
" :returns: True if producer close requested successfully, False otherwise\n"
10291025
"\n"

tests/test_Producer.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,3 +1417,18 @@ def test_producer_close():
14171417
msg = {"test": "test"}
14181418
producer.produce(json.dumps(msg))
14191419
assert producer.close(), "The producer could not be closed on demand"
1420+
1421+
def test_producer_close_with_timeout():
1422+
"""
1423+
Ensures the producer close can be requested on demand
1424+
"""
1425+
conf = {
1426+
'debug': 'all',
1427+
'socket.timeout.ms': 10,
1428+
'error_cb': error_cb,
1429+
'message.timeout.ms': 10
1430+
}
1431+
producer = Producer(conf)
1432+
msg = {"test": "test"}
1433+
producer.produce(json.dumps(msg))
1434+
assert producer.close(0.1), "The producer could not be closed on demand with timeout"

0 commit comments

Comments
 (0)