Skip to content

Raise RuntimeError for all consumer methods after close #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
Py_ssize_t pos = 0;
rd_kafka_resp_err_t err;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OO", kws,
&tlist, &on_assign, &on_revoke))
return NULL;
Expand Down Expand Up @@ -178,6 +184,12 @@ static PyObject *Consumer_unsubscribe (Handle *self,

rd_kafka_resp_err_t err;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

err = rd_kafka_unsubscribe(self->rk);
if (err) {
cfl_PyErr_Format(err,
Expand All @@ -195,6 +207,12 @@ static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
rd_kafka_topic_partition_list_t *c_parts;
rd_kafka_resp_err_t err;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!(c_parts = py_to_c_parts(tlist)))
return NULL;

Expand All @@ -219,6 +237,12 @@ static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {

rd_kafka_resp_err_t err;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

self->u.Consumer.rebalance_assigned++;

err = rd_kafka_assign(self->rk, NULL);
Expand All @@ -239,6 +263,12 @@ static PyObject *Consumer_assignment (Handle *self, PyObject *args,
rd_kafka_topic_partition_list_t *c_parts;
rd_kafka_resp_err_t err;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

err = rd_kafka_assignment(self->rk, &c_parts);
if (err) {
cfl_PyErr_Format(err,
Expand All @@ -265,6 +295,12 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
int async = 1;
static char *kws[] = { "message", "offsets", "async",NULL };

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kws,
&msg, &offsets, &async_o))
return NULL;
Expand Down Expand Up @@ -342,6 +378,11 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
rd_kafka_topic_partition_list_t *c_offsets;
static char *kws[] = { "message", "offsets", NULL };

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", kws,
&msg, &offsets))
Expand Down Expand Up @@ -410,6 +451,12 @@ static PyObject *Consumer_committed (Handle *self, PyObject *args,
double tmout = -1.0f;
static char *kws[] = { "partitions", "timeout", NULL };

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws,
&plist, &tmout))
return NULL;
Expand Down Expand Up @@ -445,6 +492,12 @@ static PyObject *Consumer_position (Handle *self, PyObject *args,
rd_kafka_resp_err_t err;
static char *kws[] = { "partitions", NULL };

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws,
&plist))
return NULL;
Expand Down Expand Up @@ -482,6 +535,12 @@ static PyObject *Consumer_get_watermark_offsets (Handle *self, PyObject *args,
static char *kws[] = { "partition", "timeout", "cached", NULL };
PyObject *rtup;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|db", kws,
(PyObject **)&tp, &tmout, &cached))
return NULL;
Expand Down Expand Up @@ -528,6 +587,12 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
PyObject *msgobj;
CallState cs;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
"Consumer closed");
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
return NULL;

Expand Down Expand Up @@ -594,6 +659,7 @@ static PyMethodDef Consumer_methods[] = {
"rebalance operation.\n"
"\n"
" :raises KafkaException:\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
"\n"
".. py:function:: on_assign(consumer, partitions)\n"
Expand All @@ -605,7 +671,9 @@ static PyMethodDef Consumer_methods[] = {
},
{ "unsubscribe", (PyCFunction)Consumer_unsubscribe, METH_NOARGS,
" Remove current subscription.\n"
"\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "poll", (PyCFunction)Consumer_poll,
Expand All @@ -625,6 +693,7 @@ static PyMethodDef Consumer_methods[] = {
" :param float timeout: Maximum time to block waiting for message, event or callback.\n"
" :returns: A Message object or None on timeout\n"
" :rtype: :py:class:`Message` or None\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "assign", (PyCFunction)Consumer_assign, METH_O,
Expand All @@ -634,11 +703,13 @@ static PyMethodDef Consumer_methods[] = {
":py:class:`TopicPartition` and starts consuming.\n"
"\n"
" :param list(TopicPartition) partitions: List of topic+partitions and optionally initial offsets to start consuming.\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "unassign", (PyCFunction)Consumer_unassign, METH_NOARGS,
" Removes the current partition assignment and stops consuming.\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "assignment", (PyCFunction)Consumer_assignment,
Expand All @@ -650,6 +721,7 @@ static PyMethodDef Consumer_methods[] = {
" :returns: List of assigned topic+partitions.\n"
" :rtype: list(TopicPartition)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "store_offsets", (PyCFunction)Consumer_store_offsets, METH_VARARGS|METH_KEYWORDS,
Expand All @@ -666,6 +738,7 @@ static PyMethodDef Consumer_methods[] = {
" :param list(TopicPartition) offsets: List of topic+partitions+offsets to store.\n"
" :rtype: None\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS,
Expand All @@ -682,6 +755,7 @@ static PyMethodDef Consumer_methods[] = {
" :param bool async: Asynchronous commit, return immediately.\n"
" :rtype: None\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "committed", (PyCFunction)Consumer_committed,
Expand All @@ -696,6 +770,7 @@ static PyMethodDef Consumer_methods[] = {
" :returns: List of topic+partitions with offset and possibly error set.\n"
" :rtype: list(TopicPartition)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "position", (PyCFunction)Consumer_position,
Expand All @@ -710,6 +785,7 @@ static PyMethodDef Consumer_methods[] = {
" :returns: List of topic+partitions with offset and possibly error set.\n"
" :rtype: list(TopicPartition)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "get_watermark_offsets", (PyCFunction)Consumer_get_watermark_offsets,
Expand All @@ -726,6 +802,7 @@ static PyMethodDef Consumer_methods[] = {
" :returns: Tuple of (low,high) on success or None on timeout.\n"
" :rtype: tuple(int,int)\n"
" :raises: KafkaException\n"
" :raises: RuntimeError if called on a closed consumer\n"
"\n"
},
{ "close", (PyCFunction)Consumer_close, METH_NOARGS,
Expand Down
58 changes: 58 additions & 0 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,61 @@ def test_multiple_close_throw_exception():
with pytest.raises(RuntimeError) as ex:
c.close()
assert 'Consumer already closed' == str(ex.value)


def test_any_method_after_close_throws_exception():
""" Calling any consumer method after close should thorw a RuntimeError
"""
c = Consumer({'group.id': 'test',
'enable.auto.commit': True,
'enable.auto.offset.store': False,
'socket.timeout.ms': 50,
'session.timeout.ms': 100})

c.subscribe(["test"])
c.unsubscribe()
c.close()

with pytest.raises(RuntimeError) as ex:
c.subscribe(['test'])
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.unsubscribe()
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.poll()
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.assign([TopicPartition('test', 0)])
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.unassign()
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.assignment()
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.store_offsets(offsets=[TopicPartition("test", 0, 42)])
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.commit()
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.committed([TopicPartition("test", 0)])
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
c.position([TopicPartition("test", 0)])
assert 'Consumer already closed' == str(ex.value)

with pytest.raises(RuntimeError) as ex:
lo, hi = c.get_watermark_offsets(TopicPartition("test", 0))
assert 'Consumer already closed' == str(ex.value)