17
17
#
18
18
19
19
import pytest
20
- from confluent_kafka import TopicPartition , OFFSET_END , KafkaError
20
+ from confluent_kafka import TopicPartition , OFFSET_END , KafkaError , KafkaException
21
21
22
22
from confluent_kafka .error import ConsumeError
23
23
from confluent_kafka .serialization import StringSerializer
@@ -44,3 +44,49 @@ def test_consume_error(kafka_cluster):
44
44
consumer .poll ()
45
45
assert exc_info .value .args [0 ].code () == KafkaError ._PARTITION_EOF , \
46
46
"Expected _PARTITION_EOF, not {}" .format (exc_info )
47
+
48
+
49
+ def test_consume_error_commit (kafka_cluster ):
50
+ """
51
+ Tests to ensure that we handle messages with errors when commiting.
52
+ """
53
+ topic = kafka_cluster .create_topic ("test_commit_transaction" )
54
+ consumer_conf = {'group.id' : 'pytest' ,
55
+ 'session.timeout.ms' : 1500 }
56
+
57
+ producer = kafka_cluster .producer ()
58
+ producer .produce (topic = topic , value = "a" )
59
+ producer .flush ()
60
+
61
+ consumer = kafka_cluster .cimpl_consumer (consumer_conf )
62
+ consumer .subscribe ([topic ])
63
+ try :
64
+ m = consumer .poll (1 )
65
+ consumer .commit (m )
66
+ except KafkaException as e :
67
+ assert e .args [0 ].code () == KafkaError ._INVALID_ARG , \
68
+ "Expected INVALID_ARG, not {}" .format (e )
69
+
70
+
71
+ def test_consume_error_store_offsets (kafka_cluster ):
72
+ """
73
+ Tests to ensure that we handle messages with errors when storing offsets.
74
+ """
75
+ topic = kafka_cluster .create_topic ("test_commit_transaction" )
76
+ consumer_conf = {'group.id' : 'pytest' ,
77
+ 'session.timeout.ms' : 1500 ,
78
+ 'enable.auto.offset.store' : True ,
79
+ 'enable.auto.commit' : False }
80
+
81
+ producer = kafka_cluster .producer ()
82
+ producer .produce (topic = topic , value = "a" )
83
+ producer .flush ()
84
+
85
+ consumer = kafka_cluster .cimpl_consumer (consumer_conf )
86
+ consumer .subscribe ([topic ])
87
+ try :
88
+ m = consumer .poll (1 )
89
+ consumer .store_offsets (m )
90
+ except KafkaException as e :
91
+ assert e .args [0 ].code () == KafkaError ._INVALID_ARG , \
92
+ "Expected INVALID_ARG, not {}" .format (e )
0 commit comments