Skip to content

Fixed a segfault when 'commit' or 'store_offsets' consumer method is called incorrectly with errored Message object #1754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ v2.4.1 is a maintenance release with the following fixes and enhancements:
- Removed usage of `strcpy` to enhance security of the client (#1745)
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object

confluent-kafka-python is based on librdkafka v2.4.1, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)
Expand Down
20 changes: 20 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
}

m = (Message *)msg;

if (m->error != Py_None) {
PyObject *error = Message_error(m, NULL);
PyObject *errstr = PyObject_CallMethod(error, "str", NULL);
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"Cannot commit offsets for message with error: '%s'" , PyUnicode_AsUTF8(errstr));
Py_DECREF(error);
Py_DECREF(errstr);
return NULL;
}

c_offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
Expand Down Expand Up @@ -627,6 +637,16 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,

m = (Message *)msg;

if (m->error != Py_None) {
PyObject *error = Message_error(m, NULL);
PyObject *errstr = PyObject_CallMethod(error, "str", NULL);
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"Cannot store offsets for message with error: '%s'" , PyUnicode_AsUTF8(errstr));
Py_DECREF(error);
Py_DECREF(errstr);
return NULL;
}

c_offsets = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),
Expand Down
52 changes: 51 additions & 1 deletion tests/integration/consumer/test_consumer_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#

import pytest
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError, KafkaException

from confluent_kafka.error import ConsumeError
from confluent_kafka.serialization import StringSerializer
Expand All @@ -44,3 +44,53 @@ def test_consume_error(kafka_cluster):
consumer.poll()
assert exc_info.value.args[0].code() == KafkaError._PARTITION_EOF, \
"Expected _PARTITION_EOF, not {}".format(exc_info)


def test_consume_error_commit(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when commiting.
"""
topic = kafka_cluster.create_topic("test_commit_transaction")
consumer_conf = {'group.id': 'pytest',
'session.timeout.ms': 100}

producer = kafka_cluster.producer()
producer.produce(topic=topic, value="a")
producer.flush()

consumer = kafka_cluster.cimpl_consumer(consumer_conf)
consumer.subscribe([topic])
try:
# Since the session timeout value is low, JoinGroupRequest will fail
# and we get error in a message while polling.
m = consumer.poll(1)
consumer.commit(m)
except KafkaException as e:
assert e.args[0].code() == KafkaError._INVALID_ARG, \
"Expected INVALID_ARG, not {}".format(e)


def test_consume_error_store_offsets(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when storing offsets.
"""
topic = kafka_cluster.create_topic("test_commit_transaction")
consumer_conf = {'group.id': 'pytest',
'session.timeout.ms': 100,
'enable.auto.offset.store': True,
'enable.auto.commit': False}

producer = kafka_cluster.producer()
producer.produce(topic=topic, value="a")
producer.flush()

consumer = kafka_cluster.cimpl_consumer(consumer_conf)
consumer.subscribe([topic])
try:
# Since the session timeout value is low, JoinGroupRequest will fail
# and we get error in a message while polling.
m = consumer.poll(1)
consumer.store_offsets(m)
except KafkaException as e:
assert e.args[0].code() == KafkaError._INVALID_ARG, \
"Expected INVALID_ARG, not {}".format(e)