Skip to content

Commit a6d2e1e

Browse files
authored
Fixed a segfault when 'commit' or 'store_offsets' consumer method is called incorrectly with errored Message object (#1754)
Fixed a segfault when 'commit' or 'store_offsets' consumer method is called incorrectly with errored Message object
1 parent 729563f commit a6d2e1e

File tree

3 files changed

+72
-1
lines changed

3 files changed

+72
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ v2.4.1 is a maintenance release with the following fixes and enhancements:
77
- Removed usage of `strcpy` to enhance security of the client (#1745)
88
- Fixed invalid write in `OAUTHBEARER/OIDC` extensions copy (#1745)
99
- Fixed an issue related to import error of `TopicCollection` and `TopicPartitionInfo` classes when importing through other module like mypy.
10+
- Fixed a segfault when `commit` or `store_offsets` consumer method is called incorrectly with errored Message object
1011

1112
confluent-kafka-python is based on librdkafka v2.4.1, see the
1213
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.4.1)

src/confluent_kafka/src/Consumer.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,16 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
496496
}
497497

498498
m = (Message *)msg;
499+
500+
if (m->error != Py_None) {
501+
PyObject *error = Message_error(m, NULL);
502+
PyObject *errstr = PyObject_CallMethod(error, "str", NULL);
503+
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
504+
"Cannot commit offsets for message with error: '%s'" , PyUnicode_AsUTF8(errstr));
505+
Py_DECREF(error);
506+
Py_DECREF(errstr);
507+
return NULL;
508+
}
499509

500510
c_offsets = rd_kafka_topic_partition_list_new(1);
501511
rktpar = rd_kafka_topic_partition_list_add(
@@ -627,6 +637,16 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
627637

628638
m = (Message *)msg;
629639

640+
if (m->error != Py_None) {
641+
PyObject *error = Message_error(m, NULL);
642+
PyObject *errstr = PyObject_CallMethod(error, "str", NULL);
643+
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
644+
"Cannot store offsets for message with error: '%s'" , PyUnicode_AsUTF8(errstr));
645+
Py_DECREF(error);
646+
Py_DECREF(errstr);
647+
return NULL;
648+
}
649+
630650
c_offsets = rd_kafka_topic_partition_list_new(1);
631651
rktpar = rd_kafka_topic_partition_list_add(
632652
c_offsets, cfl_PyUnistr_AsUTF8(m->topic, &uo8),

tests/integration/consumer/test_consumer_error.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#
1818

1919
import pytest
20-
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError
20+
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError, KafkaException
2121

2222
from confluent_kafka.error import ConsumeError
2323
from confluent_kafka.serialization import StringSerializer
@@ -44,3 +44,53 @@ def test_consume_error(kafka_cluster):
4444
consumer.poll()
4545
assert exc_info.value.args[0].code() == KafkaError._PARTITION_EOF, \
4646
"Expected _PARTITION_EOF, not {}".format(exc_info)
47+
48+
49+
def test_consume_error_commit(kafka_cluster):
50+
"""
51+
Tests to ensure that we handle messages with errors when commiting.
52+
"""
53+
topic = kafka_cluster.create_topic("test_commit_transaction")
54+
consumer_conf = {'group.id': 'pytest',
55+
'session.timeout.ms': 100}
56+
57+
producer = kafka_cluster.producer()
58+
producer.produce(topic=topic, value="a")
59+
producer.flush()
60+
61+
consumer = kafka_cluster.cimpl_consumer(consumer_conf)
62+
consumer.subscribe([topic])
63+
try:
64+
# Since the session timeout value is low, JoinGroupRequest will fail
65+
# and we get error in a message while polling.
66+
m = consumer.poll(1)
67+
consumer.commit(m)
68+
except KafkaException as e:
69+
assert e.args[0].code() == KafkaError._INVALID_ARG, \
70+
"Expected INVALID_ARG, not {}".format(e)
71+
72+
73+
def test_consume_error_store_offsets(kafka_cluster):
74+
"""
75+
Tests to ensure that we handle messages with errors when storing offsets.
76+
"""
77+
topic = kafka_cluster.create_topic("test_commit_transaction")
78+
consumer_conf = {'group.id': 'pytest',
79+
'session.timeout.ms': 100,
80+
'enable.auto.offset.store': True,
81+
'enable.auto.commit': False}
82+
83+
producer = kafka_cluster.producer()
84+
producer.produce(topic=topic, value="a")
85+
producer.flush()
86+
87+
consumer = kafka_cluster.cimpl_consumer(consumer_conf)
88+
consumer.subscribe([topic])
89+
try:
90+
# Since the session timeout value is low, JoinGroupRequest will fail
91+
# and we get error in a message while polling.
92+
m = consumer.poll(1)
93+
consumer.store_offsets(m)
94+
except KafkaException as e:
95+
assert e.args[0].code() == KafkaError._INVALID_ARG, \
96+
"Expected INVALID_ARG, not {}".format(e)

0 commit comments

Comments
 (0)