@@ -496,6 +496,16 @@ static PyObject *Consumer_commit (Handle *self, PyObject *args,
496
496
}
497
497
498
498
m = (Message * )msg ;
499
+
500
+ if (m -> error != Py_None ) {
501
+ PyObject * error = Message_error (m , NULL );
502
+ PyObject * errstr = PyObject_CallMethod (error , "str" , NULL );
503
+ cfl_PyErr_Format (RD_KAFKA_RESP_ERR__INVALID_ARG ,
504
+ "Cannot commit offsets for message with error: '%s'" , PyUnicode_AsUTF8 (errstr ));
505
+ Py_DECREF (error );
506
+ Py_DECREF (errstr );
507
+ return NULL ;
508
+ }
499
509
500
510
c_offsets = rd_kafka_topic_partition_list_new (1 );
501
511
rktpar = rd_kafka_topic_partition_list_add (
@@ -627,6 +637,16 @@ static PyObject *Consumer_store_offsets (Handle *self, PyObject *args,
627
637
628
638
m = (Message * )msg ;
629
639
640
+ if (m -> error != Py_None ) {
641
+ PyObject * error = Message_error (m , NULL );
642
+ PyObject * errstr = PyObject_CallMethod (error , "str" , NULL );
643
+ cfl_PyErr_Format (RD_KAFKA_RESP_ERR__INVALID_ARG ,
644
+ "Cannot store offsets for message with error: '%s'" , PyUnicode_AsUTF8 (errstr ));
645
+ Py_DECREF (error );
646
+ Py_DECREF (errstr );
647
+ return NULL ;
648
+ }
649
+
630
650
c_offsets = rd_kafka_topic_partition_list_new (1 );
631
651
rktpar = rd_kafka_topic_partition_list_add (
632
652
c_offsets , cfl_PyUnistr_AsUTF8 (m -> topic , & uo8 ),
0 commit comments