Skip to content

Commit 44e6e63

Browse files
authored
Merge pull request #355 from confluentinc/readmefix1
Readmefix1
2 parents ed3bb32 + 810e516 commit 44e6e63

File tree

5 files changed

+191
-73
lines changed

5 files changed

+191
-73
lines changed

README.md

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ with Apache Kafka at its core. It's high priority for us that client features ke
2424
pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/).
2525

2626
The Python bindings provides a high-level Producer and Consumer with support
27-
for the balanced consumer groups of Apache Kafka 0.9.
27+
for the balanced consumer groups of Apache Kafka >= 0.9.
2828

2929
See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
3030

@@ -40,11 +40,27 @@ Usage
4040
from confluent_kafka import Producer
4141

4242

43-
p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
43+
p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})
44+
45+
def delivery_report(err, msg):
46+
""" Called once for each message produced to indicate delivery result.
47+
Triggered by poll() or flush(). """
48+
if err is not None:
49+
print('Message delivery failed: {}'.format(err))
50+
else:
51+
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
4452

4553
for data in some_data_source:
46-
p.produce('mytopic', data.encode('utf-8'))
54+
# Trigger any available delivery report callbacks from previous produce() calls
55+
p.poll(0)
56+
57+
# Asynchronously produce a message, the delivery report callback
58+
# will be triggered from poll() above, or flush() below, when the message has
59+
# been successfully delivered or failed permanently.
60+
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
4761

62+
# Wait for any outstanding messages to be delivered and delivery report
63+
# callbacks to be triggered.
4864
p.flush()
4965
```
5066

@@ -66,8 +82,10 @@ c = Consumer({
6682
c.subscribe(['mytopic'])
6783

6884
while True:
69-
msg = c.poll()
85+
msg = c.poll(1.0)
7086

87+
if msg is None:
88+
continue
7189
if msg.error():
7290
if msg.error().code() == KafkaError._PARTITION_EOF:
7391
continue
@@ -171,6 +189,24 @@ See the [examples](examples) directory for more examples, including [how to conf
171189
[Confluent Cloud](https://www.confluent.io/confluent-cloud/).
172190

173191

192+
Install
193+
=======
194+
195+
**Install self-contained binary wheels for OSX and Linux from PyPi:**
196+
197+
$ pip install confluent-kafka
198+
199+
**Install AvroProducer and AvroConsumer:**
200+
201+
$ pip install confluent-kafka[avro]
202+
203+
**Install from source from PyPi** *(requires librdkafka + dependencies to be installed separately)*:
204+
205+
$ pip install --no-binary :all: confluent-kafka
206+
207+
For source install, see *Prerequisites* below.
208+
209+
174210
Broker Compatibility
175211
====================
176212
The Python client (as well as the underlying C library librdkafka) supports
@@ -200,35 +236,21 @@ Prerequisites
200236
* Python >= 2.7 or Python 3.x
201237
* [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.5 (latest release is embedded in wheels)
202238

203-
librdkafka is embedded in the manylinux wheels, for other platforms or
239+
librdkafka is embedded in the macosx manylinux wheels, for other platforms or
204240
when a specific version of librdkafka is desired, following these guidelines:
205241

206-
* For **Debian/Ubuntu**** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`:
242+
* For **Debian/Ubuntu** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`:
207243
http://docs.confluent.io/current/installation.html#installation-apt
208244

209245
* For **RedHat** and **RPM**-based distros, add this YUM repo and then do `sudo yum install librdkafka-devel python-devel`:
210246
http://docs.confluent.io/current/installation.html#rpm-packages-via-yum
211247

212248
* On **OSX**, use **homebrew** and do `brew install librdkafka`
213249

214-
215-
Install
216-
=======
217-
218-
**Install from PyPi:**
219-
220-
$ pip install confluent-kafka
221-
222-
# for AvroProducer or AvroConsumer
223-
$ pip install confluent-kafka[avro]
224-
225-
226-
**Install from source / tarball:**
227-
228-
$ pip install .
229-
230-
# for AvroProducer or AvroConsumer
231-
$ pip install .[avro]
250+
**NOTE:** The pre-built Linux wheels do NOT contain SASL Kerberos support.
251+
If you need SASL Kerberos support you must install librdkafka and
252+
its dependencies using the above repositories and then build
253+
confluent-kafka from source.
232254

233255

234256
Build

confluent_kafka/src/confluent_kafka.c

Lines changed: 111 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -939,49 +939,125 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
939939
}
940940

941941
#ifdef RD_KAFKA_V_HEADERS
942+
943+
942944
/**
943-
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
944-
*
945-
* @returns The new Python list[(header_key, header_value),...] object.
945+
* @brief Convert Python list of tuples to rd_kafka_headers_t
946946
*/
947-
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist) {
948-
int i, len;
949-
rd_kafka_headers_t *rd_headers = NULL;
950-
rd_kafka_resp_err_t err;
951-
const char *header_key, *header_value = NULL;
952-
int header_key_len = 0, header_value_len = 0;
953-
954-
if (!PyList_Check(headers_plist)) {
955-
PyErr_SetString(PyExc_TypeError,
956-
"Headers are expected to be a "
957-
"list of (key,value) tuples");
958-
return NULL;
959-
}
947+
static rd_kafka_headers_t *py_headers_list_to_c (PyObject *hdrs) {
948+
int i, len;
949+
rd_kafka_headers_t *rd_headers = NULL;
950+
951+
len = (int)PyList_Size(hdrs);
952+
rd_headers = rd_kafka_headers_new(len);
953+
954+
for (i = 0; i < len; i++) {
955+
rd_kafka_resp_err_t err;
956+
const char *header_key, *header_value = NULL;
957+
int header_key_len = 0, header_value_len = 0;
958+
959+
if(!PyArg_ParseTuple(PyList_GET_ITEM(hdrs, i), "s#z#",
960+
&header_key, &header_key_len,
961+
&header_value, &header_value_len)){
962+
rd_kafka_headers_destroy(rd_headers);
963+
PyErr_SetString(PyExc_TypeError,
964+
"Headers are expected to be a "
965+
"tuple of (key, value)");
966+
return NULL;
967+
}
968+
969+
err = rd_kafka_header_add(rd_headers,
970+
header_key, header_key_len,
971+
header_value, header_value_len);
972+
if (err) {
973+
cfl_PyErr_Format(err,
974+
"Unable to add message header \"%s\": "
975+
"%s",
976+
header_key, rd_kafka_err2str(err));
977+
rd_kafka_headers_destroy(rd_headers);
978+
return NULL;
979+
}
980+
}
981+
return rd_headers;
982+
}
983+
984+
985+
/**
986+
* @brief Convert Python dict to rd_kafka_headers_t
987+
*/
988+
static rd_kafka_headers_t *py_headers_dict_to_c (PyObject *hdrs) {
989+
int len;
990+
Py_ssize_t pos = 0;
991+
rd_kafka_headers_t *rd_headers = NULL;
992+
PyObject *ko, *vo;
993+
994+
len = (int)PyDict_Size(hdrs);
995+
rd_headers = rd_kafka_headers_new(len);
996+
997+
while (PyDict_Next(hdrs, &pos, &ko, &vo)) {
998+
PyObject *ks, *ks8;
999+
const char *k;
1000+
const void *v = NULL;
1001+
Py_ssize_t vsize = 0;
1002+
rd_kafka_resp_err_t err;
1003+
1004+
if (!(ks = cfl_PyObject_Unistr(ko))) {
1005+
PyErr_SetString(PyExc_TypeError,
1006+
"expected header key to be unicode "
1007+
"string");
1008+
rd_kafka_headers_destroy(rd_headers);
1009+
return NULL;
1010+
}
9601011

961-
len = PyList_Size(headers_plist);
962-
rd_headers = rd_kafka_headers_new(len);
1012+
k = cfl_PyUnistr_AsUTF8(ks, &ks8);
9631013

964-
for (i = 0; i < len; i++) {
965-
if(!PyArg_ParseTuple(PyList_GET_ITEM(headers_plist, i), "s#z#", &header_key,
966-
&header_key_len, &header_value, &header_value_len)){
967-
rd_kafka_headers_destroy(rd_headers);
968-
PyErr_SetString(PyExc_TypeError,
969-
"Headers are expected to be a list of (key,value) tuples");
970-
return NULL;
1014+
if (vo != Py_None) {
1015+
if (PyString_AsStringAndSize(vo, (char **)&v,
1016+
&vsize) == -1) {
1017+
Py_DECREF(ks);
1018+
rd_kafka_headers_destroy(rd_headers);
1019+
return NULL;
1020+
}
1021+
}
1022+
1023+
if ((err = rd_kafka_header_add(rd_headers, k, -1, v, vsize))) {
1024+
cfl_PyErr_Format(err,
1025+
"Unable to add message header \"%s\": "
1026+
"%s",
1027+
k, rd_kafka_err2str(err));
1028+
Py_DECREF(ks);
1029+
rd_kafka_headers_destroy(rd_headers);
1030+
return NULL;
1031+
}
1032+
1033+
Py_DECREF(ks);
9711034
}
9721035

973-
err = rd_kafka_header_add(rd_headers, header_key, header_key_len, header_value, header_value_len);
974-
if (err) {
975-
rd_kafka_headers_destroy(rd_headers);
976-
cfl_PyErr_Format(err,
977-
"Unable to create message headers: %s",
978-
rd_kafka_err2str(err));
979-
return NULL;
1036+
return rd_headers;
1037+
}
1038+
1039+
1040+
/**
1041+
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
1042+
*
1043+
* @returns The new Python list[(header_key, header_value),...] object.
1044+
*/
1045+
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs) {
1046+
1047+
if (PyList_Check(hdrs)) {
1048+
return py_headers_list_to_c(hdrs);
1049+
} else if (PyDict_Check(hdrs)) {
1050+
return py_headers_dict_to_c(hdrs);
1051+
} else {
1052+
PyErr_Format(PyExc_TypeError,
1053+
"expected headers to be "
1054+
"dict or list of (key, value) tuples, not %s",
1055+
((PyTypeObject *)PyObject_Type(hdrs))->tp_name);
1056+
return NULL;
9801057
}
981-
}
982-
return rd_headers;
9831058
}
9841059

1060+
9851061
/**
9861062
* @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
9871063
*
@@ -995,7 +1071,7 @@ PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
9951071
size_t header_value_size;
9961072
PyObject *header_list;
9971073

998-
header_size = rd_kafka_header_cnt(headers);
1074+
header_size = rd_kafka_header_cnt(headers);
9991075
header_list = PyList_New(header_size);
10001076

10011077
while (!rd_kafka_header_get_all(headers, idx++,

confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
270270
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
271271

272272
#ifdef RD_KAFKA_V_HEADERS
273-
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist);
273+
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs);
274274
PyObject *c_headers_to_py (rd_kafka_headers_t *headers);
275275
#endif
276276
/****************************************************************************

examples/integration_test.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import sys
2727
import json
2828
import gc
29+
import struct
2930
from copy import copy
3031

3132
try:
@@ -117,7 +118,8 @@ def verify_producer():
117118
p = confluent_kafka.Producer(**conf)
118119
print('producer at %s' % p)
119120

120-
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1')]
121+
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1'),
122+
('foobin', struct.pack('hhl', 10, 20, 30))]
121123

122124
# Produce some messages
123125
p.produce(topic, 'Hello Python!', headers=headers)
@@ -464,6 +466,8 @@ def print_wmark(consumer, parts):
464466

465467
first_msg = None
466468

469+
example_header = None
470+
467471
while True:
468472
# Consume until EOF or error
469473

@@ -517,12 +521,15 @@ def print_wmark(consumer, parts):
517521
print('Sync committed offset: %s' % offsets)
518522

519523
msgcnt += 1
520-
if msgcnt >= max_msgcnt:
524+
if msgcnt >= max_msgcnt and example_header is not None:
521525
print('max_msgcnt %d reached' % msgcnt)
522526
break
523527

524528
assert example_header, "We should have received at least one header"
525-
assert example_header == [(u'foo1', 'bar'), (u'foo1', 'bar2'), (u'foo2', '1')]
529+
assert example_header == [(u'foo1', 'bar'),
530+
(u'foo1', 'bar2'),
531+
(u'foo2', '1'),
532+
('foobin', struct.pack('hhl', 10, 20, 30))]
526533

527534
# Get current assignment
528535
assignment = c.assignment()

tests/test_Producer.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pytest
33

44
from confluent_kafka import Producer, KafkaError, KafkaException, libversion
5+
from struct import pack
56

67

78
def error_cb(err):
@@ -65,19 +66,31 @@ def test_produce_headers():
6566
'error_cb': error_cb,
6667
'default.topic.config': {'message.timeout.ms': 10}})
6768

68-
p.produce('mytopic', value='somedata', key='a key', headers=[('headerkey', 'headervalue')])
69-
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'dupvalue')])
70-
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')])
71-
p.produce('mytopic', value='somedata', key='a key', headers=[('key_with_null_value', None)])
72-
p.produce('mytopic', value='somedata', key='a key', headers=[])
69+
binval = pack('hhl', 1, 2, 3)
7370

74-
with pytest.raises(TypeError) as ex:
75-
p.produce('mytopic', value='somedata', key='a key', headers={'my': 'dict'})
76-
assert 'Headers are expected to be a list of (key,value) tuples' == str(ex.value)
71+
headers_to_test = [
72+
[('headerkey', 'headervalue')],
73+
[('dupkey', 'dupvalue'), ('empty', ''), ('dupkey', 'dupvalue')],
74+
[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')],
75+
[('key_with_null_value', None)],
76+
[('binaryval', binval)],
7777

78-
with pytest.raises(TypeError) as ex:
78+
{'headerkey': 'headervalue'},
79+
{'dupkey': 'dupvalue', 'empty': '', 'dupkey': 'dupvalue'}, # noqa: F601
80+
{'dupkey': 'dupvalue', 'dupkey': 'diffvalue'}, # noqa: F601
81+
{'key_with_null_value': None},
82+
{'binaryval': binval}
83+
]
84+
85+
for headers in headers_to_test:
86+
p.produce('mytopic', value='somedata', key='a key', headers=headers)
87+
p.produce('mytopic', value='somedata', headers=headers)
88+
89+
with pytest.raises(TypeError):
90+
p.produce('mytopic', value='somedata', key='a key', headers=('a', 'b'))
91+
92+
with pytest.raises(TypeError):
7993
p.produce('mytopic', value='somedata', key='a key', headers=[('malformed_header')])
80-
assert 'Headers are expected to be a list of (key,value) tuples' == str(ex.value)
8194

8295
p.flush()
8396

0 commit comments

Comments
 (0)