Skip to content

Commit c55ffc1

Browse files
committed
Allow passing headers as both list(tuples) and dict()
This also fixes type checking, the headers were assumed to be a List, if they were not (such as a dict) it lead to memory corruption and mayhem. Also added test for binary header values.
1 parent 98af46d commit c55ffc1

File tree

4 files changed

+143
-39
lines changed

4 files changed

+143
-39
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 112 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -939,43 +939,125 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
939939
}
940940

941941
#ifdef RD_KAFKA_V_HEADERS
942+
943+
944+
/**
945+
* @brief Convert Python list of tuples to rd_kafka_headers_t
946+
*/
947+
static rd_kafka_headers_t *py_headers_list_to_c (PyObject *hdrs) {
948+
int i, len;
949+
rd_kafka_headers_t *rd_headers = NULL;
950+
951+
len = (int)PyList_Size(hdrs);
952+
rd_headers = rd_kafka_headers_new(len);
953+
954+
for (i = 0; i < len; i++) {
955+
rd_kafka_resp_err_t err;
956+
const char *header_key, *header_value = NULL;
957+
int header_key_len = 0, header_value_len = 0;
958+
959+
if(!PyArg_ParseTuple(PyList_GET_ITEM(hdrs, i), "s#z#",
960+
&header_key, &header_key_len,
961+
&header_value, &header_value_len)){
962+
rd_kafka_headers_destroy(rd_headers);
963+
PyErr_SetString(PyExc_TypeError,
964+
"Headers are expected to be a "
965+
"tuple of (key, value)");
966+
return NULL;
967+
}
968+
969+
err = rd_kafka_header_add(rd_headers,
970+
header_key, header_key_len,
971+
header_value, header_value_len);
972+
if (err) {
973+
cfl_PyErr_Format(err,
974+
"Unable to add message header \"%s\": "
975+
"%s",
976+
header_key, rd_kafka_err2str(err));
977+
rd_kafka_headers_destroy(rd_headers);
978+
return NULL;
979+
}
980+
}
981+
return rd_headers;
982+
}
983+
984+
985+
/**
986+
* @brief Convert Python dict to rd_kafka_headers_t
987+
*/
988+
static rd_kafka_headers_t *py_headers_dict_to_c (PyObject *hdrs) {
989+
int len;
990+
Py_ssize_t pos = 0;
991+
rd_kafka_headers_t *rd_headers = NULL;
992+
PyObject *ko, *vo;
993+
994+
len = (int)PyDict_Size(hdrs);
995+
rd_headers = rd_kafka_headers_new(len);
996+
997+
while (PyDict_Next(hdrs, &pos, &ko, &vo)) {
998+
PyObject *ks, *ks8;
999+
const char *k;
1000+
const void *v = NULL;
1001+
Py_ssize_t vsize = 0;
1002+
rd_kafka_resp_err_t err;
1003+
1004+
if (!(ks = cfl_PyObject_Unistr(ko))) {
1005+
PyErr_SetString(PyExc_TypeError,
1006+
"expected header key to be unicode "
1007+
"string");
1008+
rd_kafka_headers_destroy(rd_headers);
1009+
return NULL;
1010+
}
1011+
1012+
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
1013+
1014+
if (vo != Py_None) {
1015+
if (PyString_AsStringAndSize(vo, (char **)&v,
1016+
&vsize) == -1) {
1017+
Py_DECREF(ks);
1018+
rd_kafka_headers_destroy(rd_headers);
1019+
return NULL;
1020+
}
1021+
}
1022+
1023+
if ((err = rd_kafka_header_add(rd_headers, k, -1, v, vsize))) {
1024+
cfl_PyErr_Format(err,
1025+
"Unable to add message header \"%s\": "
1026+
"%s",
1027+
k, rd_kafka_err2str(err));
1028+
Py_DECREF(ks);
1029+
rd_kafka_headers_destroy(rd_headers);
1030+
return NULL;
1031+
}
1032+
1033+
Py_DECREF(ks);
1034+
}
1035+
1036+
return rd_headers;
1037+
}
1038+
1039+
9421040
/**
9431041
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
9441042
*
9451043
* @returns The new Python list[(header_key, header_value),...] object.
9461044
*/
947-
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist) {
948-
int i, len;
949-
rd_kafka_headers_t *rd_headers = NULL;
950-
rd_kafka_resp_err_t err;
951-
const char *header_key, *header_value = NULL;
952-
int header_key_len = 0, header_value_len = 0;
953-
954-
len = PyList_Size(headers_plist);
955-
rd_headers = rd_kafka_headers_new(len);
956-
957-
for (i = 0; i < len; i++) {
958-
959-
if(!PyArg_ParseTuple(PyList_GET_ITEM(headers_plist, i), "s#z#", &header_key,
960-
&header_key_len, &header_value, &header_value_len)){
961-
rd_kafka_headers_destroy(rd_headers);
962-
PyErr_SetString(PyExc_TypeError,
963-
"Headers are expected to be a tuple of (key, value)");
964-
return NULL;
965-
}
966-
967-
err = rd_kafka_header_add(rd_headers, header_key, header_key_len, header_value, header_value_len);
968-
if (err) {
969-
rd_kafka_headers_destroy(rd_headers);
970-
cfl_PyErr_Format(err,
971-
"Unable to create message headers: %s",
972-
rd_kafka_err2str(err));
973-
return NULL;
1045+
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs) {
1046+
1047+
if (PyList_Check(hdrs)) {
1048+
return py_headers_list_to_c(hdrs);
1049+
} else if (PyDict_Check(hdrs)) {
1050+
return py_headers_dict_to_c(hdrs);
1051+
} else {
1052+
PyErr_Format(PyExc_TypeError,
1053+
"expected headers to be "
1054+
"dict or list of (key, value) tuples, not %s",
1055+
((PyTypeObject *)PyObject_Type(hdrs))->tp_name);
1056+
return NULL;
9741057
}
975-
}
976-
return rd_headers;
9771058
}
9781059

1060+
9791061
/**
9801062
* @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
9811063
*
@@ -989,7 +1071,7 @@ PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
9891071
size_t header_value_size;
9901072
PyObject *header_list;
9911073

992-
header_size = rd_kafka_header_cnt(headers);
1074+
header_size = rd_kafka_header_cnt(headers);
9931075
header_list = PyList_New(header_size);
9941076

9951077
while (!rd_kafka_header_get_all(headers, idx++,

confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
270270
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
271271

272272
#ifdef RD_KAFKA_V_HEADERS
273-
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist);
273+
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs);
274274
PyObject *c_headers_to_py (rd_kafka_headers_t *headers);
275275
#endif
276276
/****************************************************************************

examples/integration_test.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import sys
2727
import json
2828
import gc
29+
import struct
2930
from copy import copy
3031

3132
try:
@@ -117,7 +118,8 @@ def verify_producer():
117118
p = confluent_kafka.Producer(**conf)
118119
print('producer at %s' % p)
119120

120-
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1')]
121+
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1'),
122+
('foobin', struct.pack('hhl', 10, 20, 30))]
121123

122124
# Produce some messages
123125
p.produce(topic, 'Hello Python!', headers=headers)
@@ -464,6 +466,8 @@ def print_wmark(consumer, parts):
464466

465467
first_msg = None
466468

469+
example_header = None
470+
467471
while True:
468472
# Consume until EOF or error
469473

@@ -517,12 +521,15 @@ def print_wmark(consumer, parts):
517521
print('Sync committed offset: %s' % offsets)
518522

519523
msgcnt += 1
520-
if msgcnt >= max_msgcnt:
524+
if msgcnt >= max_msgcnt and example_header is not None:
521525
print('max_msgcnt %d reached' % msgcnt)
522526
break
523527

524528
assert example_header, "We should have received at least one header"
525-
assert example_header == [(u'foo1', 'bar'), (u'foo1', 'bar2'), (u'foo2', '1')]
529+
assert example_header == [(u'foo1', 'bar'),
530+
(u'foo1', 'bar2'),
531+
(u'foo2', '1'),
532+
('foobin', struct.pack('hhl', 10, 20, 30))]
526533

527534
# Get current assignment
528535
assignment = c.assignment()

tests/test_Producer.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pytest
33

44
from confluent_kafka import Producer, KafkaError, KafkaException, libversion
5+
from struct import pack
56

67

78
def error_cb(err):
@@ -65,11 +66,25 @@ def test_produce_headers():
6566
'error_cb': error_cb,
6667
'default.topic.config': {'message.timeout.ms': 10}})
6768

68-
p.produce('mytopic', value='somedata', key='a key', headers=[('headerkey', 'headervalue')])
69-
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'dupvalue')])
70-
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')])
71-
p.produce('mytopic', value='somedata', key='a key', headers=[('key_with_null_value', None)])
72-
p.produce('mytopic', value='somedata', key='a key', headers=[])
69+
binval = pack('hhl', 1, 2, 3)
70+
71+
headers_to_test = [
72+
[('headerkey', 'headervalue')],
73+
[('dupkey', 'dupvalue'), ('empty', ''), ('dupkey', 'dupvalue')],
74+
[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')],
75+
[('key_with_null_value', None)],
76+
[('binaryval', binval)],
77+
78+
{'headerkey': 'headervalue'},
79+
{'dupkey': 'dupvalue', 'empty': '', 'dupkey': 'dupvalue'}, # noqa: F601
80+
{'dupkey': 'dupvalue', 'dupkey': 'diffvalue'}, # noqa: F601
81+
{'key_with_null_value': None},
82+
{'binaryval': binval}
83+
]
84+
85+
for headers in headers_to_test:
86+
p.produce('mytopic', value='somedata', key='a key', headers=headers)
87+
p.produce('mytopic', value='somedata', headers=headers)
7388

7489
with pytest.raises(TypeError) as ex:
7590
p.produce('mytopic', value='somedata', key='a key', headers=[('malformed_header')])

0 commit comments

Comments
 (0)