Skip to content

Commit f217ecb

Browse files
committed
Added test cases
1 parent 67d1980 commit f217ecb

File tree

1 file changed

+47
-1
lines changed

1 file changed

+47
-1
lines changed

tests/integration/consumer/test_consumer_error.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#
1818

1919
import pytest
20-
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError
20+
from confluent_kafka import TopicPartition, OFFSET_END, KafkaError, KafkaException
2121

2222
from confluent_kafka.error import ConsumeError
2323
from confluent_kafka.serialization import StringSerializer
@@ -44,3 +44,49 @@ def test_consume_error(kafka_cluster):
4444
consumer.poll()
4545
assert exc_info.value.args[0].code() == KafkaError._PARTITION_EOF, \
4646
"Expected _PARTITION_EOF, not {}".format(exc_info)
47+
48+
49+
def test_consume_error_commit(kafka_cluster):
50+
"""
51+
Tests to ensure that we handle messages with errors when commiting.
52+
"""
53+
topic = kafka_cluster.create_topic("test_commit_transaction")
54+
consumer_conf = {'group.id': 'pytest',
55+
'session.timeout.ms': 1500}
56+
57+
producer = kafka_cluster.producer()
58+
producer.produce(topic=topic, value="a")
59+
producer.flush()
60+
61+
consumer = kafka_cluster.cimpl_consumer(consumer_conf)
62+
consumer.subscribe([topic])
63+
try:
64+
m = consumer.poll(1)
65+
consumer.commit(m)
66+
except KafkaException as e:
67+
assert e.args[0].code() == KafkaError._INVALID_ARG, \
68+
"Expected INVALID_ARG, not {}".format(e)
69+
70+
71+
def test_consume_error_store_offsets(kafka_cluster):
72+
"""
73+
Tests to ensure that we handle messages with errors when storing offsets.
74+
"""
75+
topic = kafka_cluster.create_topic("test_commit_transaction")
76+
consumer_conf = {'group.id': 'pytest',
77+
'session.timeout.ms': 1500,
78+
'enable.auto.offset.store': True,
79+
'enable.auto.commit': False}
80+
81+
producer = kafka_cluster.producer()
82+
producer.produce(topic=topic, value="a")
83+
producer.flush()
84+
85+
consumer = kafka_cluster.cimpl_consumer(consumer_conf)
86+
consumer.subscribe([topic])
87+
try:
88+
m = consumer.poll(1)
89+
consumer.store_offsets(m)
90+
except KafkaException as e:
91+
assert e.args[0].code() == KafkaError._INVALID_ARG, \
92+
"Expected INVALID_ARG, not {}".format(e)

0 commit comments

Comments
 (0)