Skip to content

Readmefix1 #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 46 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ with Apache Kafka at its core. It's high priority for us that client features ke
pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/).

The Python bindings provides a high-level Producer and Consumer with support
for the balanced consumer groups of Apache Kafka 0.9.
for the balanced consumer groups of Apache Kafka >= 0.9.

See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.

Expand All @@ -40,11 +40,27 @@ Usage
from confluent_kafka import Producer


p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})

def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for data in some_data_source:
p.produce('mytopic', data.encode('utf-8'))
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)

# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
```

Expand All @@ -66,8 +82,10 @@ c = Consumer({
c.subscribe(['mytopic'])

while True:
msg = c.poll()
msg = c.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
Expand Down Expand Up @@ -171,6 +189,24 @@ See the [examples](examples) directory for more examples, including [how to conf
[Confluent Cloud](https://www.confluent.io/confluent-cloud/).


Install
=======

**Install self-contained binary wheels for OSX and Linux from PyPi:**

$ pip install confluent-kafka

**Install AvroProducer and AvroConsumer:**

$ pip install confluent-kafka[avro]

**Install from source from PyPi** *(requires librdkafka + dependencies to be installed separately)*:

$ pip install --no-binary :all: confluent-kafka

For source install, see *Prerequisites* below.


Broker Compatibility
====================
The Python client (as well as the underlying C library librdkafka) supports
Expand Down Expand Up @@ -200,35 +236,21 @@ Prerequisites
* Python >= 2.7 or Python 3.x
* [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.5 (latest release is embedded in wheels)

librdkafka is embedded in the manylinux wheels, for other platforms or
librdkafka is embedded in the macosx manylinux wheels, for other platforms or
when a specific version of librdkafka is desired, following these guidelines:

* For **Debian/Ubuntu**** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`:
* For **Debian/Ubuntu** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`:
http://docs.confluent.io/current/installation.html#installation-apt

* For **RedHat** and **RPM**-based distros, add this YUM repo and then do `sudo yum install librdkafka-devel python-devel`:
http://docs.confluent.io/current/installation.html#rpm-packages-via-yum

* On **OSX**, use **homebrew** and do `brew install librdkafka`


Install
=======

**Install from PyPi:**

$ pip install confluent-kafka

# for AvroProducer or AvroConsumer
$ pip install confluent-kafka[avro]


**Install from source / tarball:**

$ pip install .

# for AvroProducer or AvroConsumer
$ pip install .[avro]
**NOTE:** The pre-built Linux wheels do NOT contain SASL Kerberos support.
If you need SASL Kerberos support you must install librdkafka and
its dependencies using the above repositories and then build
confluent-kafka from source.


Build
Expand Down
146 changes: 111 additions & 35 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -939,49 +939,125 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
}

#ifdef RD_KAFKA_V_HEADERS


/**
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
*
* @returns The new Python list[(header_key, header_value),...] object.
* @brief Convert Python list of tuples to rd_kafka_headers_t
*/
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist) {
int i, len;
rd_kafka_headers_t *rd_headers = NULL;
rd_kafka_resp_err_t err;
const char *header_key, *header_value = NULL;
int header_key_len = 0, header_value_len = 0;

if (!PyList_Check(headers_plist)) {
PyErr_SetString(PyExc_TypeError,
"Headers are expected to be a "
"list of (key,value) tuples");
return NULL;
}
static rd_kafka_headers_t *py_headers_list_to_c (PyObject *hdrs) {
int i, len;
rd_kafka_headers_t *rd_headers = NULL;

len = (int)PyList_Size(hdrs);
rd_headers = rd_kafka_headers_new(len);

for (i = 0; i < len; i++) {
rd_kafka_resp_err_t err;
const char *header_key, *header_value = NULL;
int header_key_len = 0, header_value_len = 0;

if(!PyArg_ParseTuple(PyList_GET_ITEM(hdrs, i), "s#z#",
&header_key, &header_key_len,
&header_value, &header_value_len)){
rd_kafka_headers_destroy(rd_headers);
PyErr_SetString(PyExc_TypeError,
"Headers are expected to be a "
"tuple of (key, value)");
return NULL;
}

err = rd_kafka_header_add(rd_headers,
header_key, header_key_len,
header_value, header_value_len);
if (err) {
cfl_PyErr_Format(err,
"Unable to add message header \"%s\": "
"%s",
header_key, rd_kafka_err2str(err));
rd_kafka_headers_destroy(rd_headers);
return NULL;
}
}
return rd_headers;
}


/**
* @brief Convert Python dict to rd_kafka_headers_t
*/
static rd_kafka_headers_t *py_headers_dict_to_c (PyObject *hdrs) {
int len;
Py_ssize_t pos = 0;
rd_kafka_headers_t *rd_headers = NULL;
PyObject *ko, *vo;

len = (int)PyDict_Size(hdrs);
rd_headers = rd_kafka_headers_new(len);

while (PyDict_Next(hdrs, &pos, &ko, &vo)) {
PyObject *ks, *ks8;
const char *k;
const void *v = NULL;
Py_ssize_t vsize = 0;
rd_kafka_resp_err_t err;

if (!(ks = cfl_PyObject_Unistr(ko))) {
PyErr_SetString(PyExc_TypeError,
"expected header key to be unicode "
"string");
rd_kafka_headers_destroy(rd_headers);
return NULL;
}

len = PyList_Size(headers_plist);
rd_headers = rd_kafka_headers_new(len);
k = cfl_PyUnistr_AsUTF8(ks, &ks8);

for (i = 0; i < len; i++) {
if(!PyArg_ParseTuple(PyList_GET_ITEM(headers_plist, i), "s#z#", &header_key,
&header_key_len, &header_value, &header_value_len)){
rd_kafka_headers_destroy(rd_headers);
PyErr_SetString(PyExc_TypeError,
"Headers are expected to be a list of (key,value) tuples");
return NULL;
if (vo != Py_None) {
if (PyString_AsStringAndSize(vo, (char **)&v,
&vsize) == -1) {
Py_DECREF(ks);
rd_kafka_headers_destroy(rd_headers);
return NULL;
}
}

if ((err = rd_kafka_header_add(rd_headers, k, -1, v, vsize))) {
cfl_PyErr_Format(err,
"Unable to add message header \"%s\": "
"%s",
k, rd_kafka_err2str(err));
Py_DECREF(ks);
rd_kafka_headers_destroy(rd_headers);
return NULL;
}

Py_DECREF(ks);
}

err = rd_kafka_header_add(rd_headers, header_key, header_key_len, header_value, header_value_len);
if (err) {
rd_kafka_headers_destroy(rd_headers);
cfl_PyErr_Format(err,
"Unable to create message headers: %s",
rd_kafka_err2str(err));
return NULL;
return rd_headers;
}


/**
* @brief Convert Python list[(header_key, header_value),...]) to C rd_kafka_topic_partition_list_t.
*
* @returns The new Python list[(header_key, header_value),...] object.
*/
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs) {

if (PyList_Check(hdrs)) {
return py_headers_list_to_c(hdrs);
} else if (PyDict_Check(hdrs)) {
return py_headers_dict_to_c(hdrs);
} else {
PyErr_Format(PyExc_TypeError,
"expected headers to be "
"dict or list of (key, value) tuples, not %s",
((PyTypeObject *)PyObject_Type(hdrs))->tp_name);
return NULL;
}
}
return rd_headers;
}


/**
* @brief Convert rd_kafka_headers_t to Python list[(header_key, header_value),...])
*
Expand All @@ -995,7 +1071,7 @@ PyObject *c_headers_to_py (rd_kafka_headers_t *headers) {
size_t header_value_size;
PyObject *header_list;

header_size = rd_kafka_header_cnt(headers);
header_size = rd_kafka_header_cnt(headers);
header_list = PyList_New(header_size);

while (!rd_kafka_header_get_all(headers, idx++,
Expand Down
2 changes: 1 addition & 1 deletion confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);

#ifdef RD_KAFKA_V_HEADERS
rd_kafka_headers_t *py_headers_to_c (PyObject *headers_plist);
rd_kafka_headers_t *py_headers_to_c (PyObject *hdrs);
PyObject *c_headers_to_py (rd_kafka_headers_t *headers);
#endif
/****************************************************************************
Expand Down
13 changes: 10 additions & 3 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import sys
import json
import gc
import struct
from copy import copy

try:
Expand Down Expand Up @@ -117,7 +118,8 @@ def verify_producer():
p = confluent_kafka.Producer(**conf)
print('producer at %s' % p)

headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1')]
headers = [('foo1', 'bar'), ('foo1', 'bar2'), ('foo2', b'1'),
('foobin', struct.pack('hhl', 10, 20, 30))]

# Produce some messages
p.produce(topic, 'Hello Python!', headers=headers)
Expand Down Expand Up @@ -464,6 +466,8 @@ def print_wmark(consumer, parts):

first_msg = None

example_header = None

while True:
# Consume until EOF or error

Expand Down Expand Up @@ -517,12 +521,15 @@ def print_wmark(consumer, parts):
print('Sync committed offset: %s' % offsets)

msgcnt += 1
if msgcnt >= max_msgcnt:
if msgcnt >= max_msgcnt and example_header is not None:
print('max_msgcnt %d reached' % msgcnt)
break

assert example_header, "We should have received at least one header"
assert example_header == [(u'foo1', 'bar'), (u'foo1', 'bar2'), (u'foo2', '1')]
assert example_header == [(u'foo1', 'bar'),
(u'foo1', 'bar2'),
(u'foo2', '1'),
('foobin', struct.pack('hhl', 10, 20, 30))]

# Get current assignment
assignment = c.assignment()
Expand Down
33 changes: 23 additions & 10 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest

from confluent_kafka import Producer, KafkaError, KafkaException, libversion
from struct import pack


def error_cb(err):
Expand Down Expand Up @@ -65,19 +66,31 @@ def test_produce_headers():
'error_cb': error_cb,
'default.topic.config': {'message.timeout.ms': 10}})

p.produce('mytopic', value='somedata', key='a key', headers=[('headerkey', 'headervalue')])
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'dupvalue')])
p.produce('mytopic', value='somedata', key='a key', headers=[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')])
p.produce('mytopic', value='somedata', key='a key', headers=[('key_with_null_value', None)])
p.produce('mytopic', value='somedata', key='a key', headers=[])
binval = pack('hhl', 1, 2, 3)

with pytest.raises(TypeError) as ex:
p.produce('mytopic', value='somedata', key='a key', headers={'my': 'dict'})
assert 'Headers are expected to be a list of (key,value) tuples' == str(ex.value)
headers_to_test = [
[('headerkey', 'headervalue')],
[('dupkey', 'dupvalue'), ('empty', ''), ('dupkey', 'dupvalue')],
[('dupkey', 'dupvalue'), ('dupkey', 'diffvalue')],
[('key_with_null_value', None)],
[('binaryval', binval)],

with pytest.raises(TypeError) as ex:
{'headerkey': 'headervalue'},
{'dupkey': 'dupvalue', 'empty': '', 'dupkey': 'dupvalue'}, # noqa: F601
{'dupkey': 'dupvalue', 'dupkey': 'diffvalue'}, # noqa: F601
{'key_with_null_value': None},
{'binaryval': binval}
]

for headers in headers_to_test:
p.produce('mytopic', value='somedata', key='a key', headers=headers)
p.produce('mytopic', value='somedata', headers=headers)

with pytest.raises(TypeError):
p.produce('mytopic', value='somedata', key='a key', headers=('a', 'b'))

with pytest.raises(TypeError):
p.produce('mytopic', value='somedata', key='a key', headers=[('malformed_header')])
assert 'Headers are expected to be a list of (key,value) tuples' == str(ex.value)

p.flush()

Expand Down