Skip to content

Support for building Windows wheels #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions .appveyor-disabled.yml

This file was deleted.

30 changes: 30 additions & 0 deletions .appveyor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
environment:
global:
LIBRDKAFKA_NUGET_VERSION: 0.11.6-RC2
CIBW_SKIP: cp33-* cp34-*
CIBW_TEST_REQUIRES: pytest requests avro
# SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the
# /E:ON and /V:ON options are not enabled in the batch script intepreter
# See: http://stackoverflow.com/a/13751649/163740
CMD_IN_ENV: "cmd /E:ON /V:ON /C .\\tools\\appveyor\\run_with_env.cmd"

build_script:
- tools/windows-build.bat

artifacts:
- path: "wheelhouse\\*.whl"
name: Wheels

deploy:
- provider: S3
access_key_id:
secure: RIuhB6QPQeCdchBMSmaY/7aSrrdih+HJu443UaKsH/I=
secret_access_key:
secure: YrPW943StN3C9o9enGGpfMns7wxD7+lArRgEavjeWlO2uy2jLPKkCnSQ60qe1ffB
region: us-west-1
bucket: librdkafka-ci-packages
folder: confluent-kafka-python/p-confluent-kafka-python__bld-appveyor__plat-windows__bldtype-release__tag-$(APPVEYOR_REPO_TAG_NAME)__sha-$(APPVEYOR_REPO_COMMIT)__bid-$(APPVEYOR_BUILD_ID)
artifact: /wheelhouse\/.*\.whl/
max_error_retry: 3
on:
APPVEYOR_REPO_TAG: true
28 changes: 20 additions & 8 deletions confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
return 0;
}

c_replicas = alloca(sizeof(*c_replicas) *
c_replicas = malloc(sizeof(*c_replicas) *
replica_cnt);

for (ri = 0 ; ri < replica_cnt ; ri++) {
Expand All @@ -206,6 +206,7 @@ static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
"replica_assignment must be "
"a list of int lists with an "
"outer size of %s", err_count_desc);
free(c_replicas);
return 0;
}

Expand All @@ -231,6 +232,8 @@ static int Admin_set_replica_assignment (const char *forApi, void *c_obj,
"Unsupported forApi %s", forApi);
}

free(c_replicas);

if (err) {
PyErr_SetString(
PyExc_ValueError, errstr);
Expand All @@ -255,6 +258,7 @@ Admin_config_dict_to_c (void *c_obj, PyObject *dict, const char *op_name) {

while (PyDict_Next(dict, &pos, &ko, &vo)) {
PyObject *ks, *ks8;
PyObject *vs = NULL, *vs8 = NULL;
const char *k;
const char *v;
rd_kafka_resp_err_t err;
Expand All @@ -268,8 +272,6 @@ Admin_config_dict_to_c (void *c_obj, PyObject *dict, const char *op_name) {

k = cfl_PyUnistr_AsUTF8(ks, &ks8);


PyObject *vs = NULL, *vs8 = NULL;
if (!(vs = cfl_PyObject_Unistr(vo)) ||
!(v = cfl_PyUnistr_AsUTF8(vs, &vs8))) {
PyErr_Format(PyExc_ValueError,
Expand Down Expand Up @@ -367,7 +369,7 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args,
/*
* Parse the list of NewTopics and convert to corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * tcnt);
c_objs = malloc(sizeof(*c_objs) * tcnt);

for (i = 0 ; i < tcnt ; i++) {
NewTopic *newt = (NewTopic *)PyList_GET_ITEM(topics, i);
Expand Down Expand Up @@ -443,13 +445,15 @@ static PyObject *Admin_create_topics (Handle *self, PyObject *args,

rd_kafka_NewTopic_destroy_array(c_objs, tcnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_RETURN_NONE;

err:
rd_kafka_NewTopic_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(future); /* from options_to_c() */

return NULL;
Expand Down Expand Up @@ -503,7 +507,7 @@ static PyObject *Admin_delete_topics (Handle *self, PyObject *args,
/*
* Parse the list of strings and convert to corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * tcnt);
c_objs = malloc(sizeof(*c_objs) * tcnt);

for (i = 0 ; i < tcnt ; i++) {
PyObject *topic = PyList_GET_ITEM(topics, i);
Expand Down Expand Up @@ -544,13 +548,15 @@ static PyObject *Admin_delete_topics (Handle *self, PyObject *args,

rd_kafka_DeleteTopic_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_RETURN_NONE;

err:
rd_kafka_DeleteTopic_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(future); /* from options_to_c() */

return NULL;
Expand Down Expand Up @@ -611,7 +617,7 @@ static PyObject *Admin_create_partitions (Handle *self, PyObject *args,
/*
* Parse the list of NewPartitions and convert to corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * tcnt);
c_objs = malloc(sizeof(*c_objs) * tcnt);

for (i = 0 ; i < tcnt ; i++) {
NewPartitions *newp = (NewPartitions *)PyList_GET_ITEM(topics,
Expand Down Expand Up @@ -669,13 +675,15 @@ static PyObject *Admin_create_partitions (Handle *self, PyObject *args,

rd_kafka_NewPartitions_destroy_array(c_objs, tcnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_RETURN_NONE;

err:
rd_kafka_NewPartitions_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(future); /* from options_to_c() */

return NULL;
Expand Down Expand Up @@ -742,7 +750,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args,
* Parse the list of ConfigResources and convert to
* corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * cnt);
c_objs = malloc(sizeof(*c_objs) * cnt);

for (i = 0 ; i < cnt ; i++) {
PyObject *res = PyList_GET_ITEM(resources, i);
Expand Down Expand Up @@ -795,6 +803,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args,

rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_DECREF(ConfigResource_type); /* from lookup() */
Expand All @@ -804,6 +813,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args,
err:
rd_kafka_ConfigResource_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(ConfigResource_type); /* from lookup() */
Py_DECREF(future); /* from options_to_c() */

Expand Down Expand Up @@ -881,7 +891,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args,
* Parse the list of ConfigResources and convert to
* corresponding C types.
*/
c_objs = alloca(sizeof(*c_objs) * cnt);
c_objs = malloc(sizeof(*c_objs) * cnt);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if any implications are there to allocating on the heap as opposed to the stack for these Admin operations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*aside from needing to add free

Copy link
Contributor Author

@edenhill edenhill Sep 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially used alloca() to avoid having to call free, alloca is also alot faster, but this isn't the hot-path so it doesn't matter.
Had to change it to malloc since the MSVC for Python 2.7 is really old and doesn't support alloca.


for (i = 0 ; i < cnt ; i++) {
PyObject *res = PyList_GET_ITEM(resources, i);
Expand Down Expand Up @@ -950,6 +960,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args,

rd_kafka_ConfigResource_destroy_array(c_objs, cnt);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */

Py_DECREF(ConfigResource_type); /* from lookup() */
Expand All @@ -959,6 +970,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args,
err:
rd_kafka_ConfigResource_destroy_array(c_objs, i);
rd_kafka_AdminOptions_destroy(c_options);
free(c_objs);
Py_DECREF(ConfigResource_type); /* from lookup() */
Py_DECREF(future); /* from options_to_c() */

Expand Down
6 changes: 3 additions & 3 deletions confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ static PyObject *Consumer_seek (Handle *self, PyObject *args, PyObject *kwargs)

if (err) {
cfl_PyErr_Format(err,
"Failed to seek to offset %"PRId64": %s",
"Failed to seek to offset %"CFL_PRId64": %s",
tp->offset, rd_kafka_err2str(err));
return NULL;
}
Expand Down Expand Up @@ -917,7 +917,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,
PyObject *msglist;
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
CallState cs;
Py_ssize_t i;
Py_ssize_t i, n;

if (!self->rk) {
PyErr_SetString(PyExc_RuntimeError,
Expand All @@ -939,7 +939,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args,

rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *));

Py_ssize_t n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
tmout >= 0 ? (int)(tmout * 1000.0f) : -1,
rkmessages,
num_messages);
Expand Down
24 changes: 12 additions & 12 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -756,15 +756,15 @@ static PyObject *TopicPartition_str0 (TopicPartition *self) {
PyObject *ret;
char offset_str[40];

snprintf(offset_str, sizeof(offset_str), "%"PRId64"", self->offset);
snprintf(offset_str, sizeof(offset_str), "%"CFL_PRId64"", self->offset);

if (self->error != Py_None) {
errstr = cfl_PyObject_Unistr(self->error);
c_errstr = cfl_PyUnistr_AsUTF8(errstr, &errstr8);
}

ret = cfl_PyUnistr(
_FromFormat("TopicPartition{topic=%s,partition=%"PRId32
_FromFormat("TopicPartition{topic=%s,partition=%"CFL_PRId32
",offset=%s,error=%s}",
self->topic, self->partition,
offset_str,
Expand Down Expand Up @@ -1282,14 +1282,14 @@ static void log_cb (const rd_kafka_t *rk, int level,
CallState *cs;
static const int level_map[8] = {
/* Map syslog levels to python logging levels */
[0] = 50, /* LOG_EMERG -> logging.CRITICAL */
[1] = 50, /* LOG_ALERT -> logging.CRITICAL */
[2] = 50, /* LOG_CRIT -> logging.CRITICAL */
[3] = 40, /* LOG_ERR -> logging.ERROR */
[4] = 30, /* LOG_WARNING -> logging.WARNING */
[5] = 20, /* LOG_NOTICE -> logging.INFO */
[6] = 20, /* LOG_INFO -> logging.INFO */
[7] = 10, /* LOG_DEBUG -> logging.DEBUG */
50, /* LOG_EMERG -> logging.CRITICAL */
50, /* LOG_ALERT -> logging.CRITICAL */
50, /* LOG_CRIT -> logging.CRITICAL */
40, /* LOG_ERR -> logging.ERROR */
30, /* LOG_WARNING -> logging.WARNING */
20, /* LOG_NOTICE -> logging.INFO */
20, /* LOG_INFO -> logging.INFO */
10, /* LOG_DEBUG -> logging.DEBUG */
};

cs = CallState_get(h);
Expand Down Expand Up @@ -1444,7 +1444,7 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_delivery")) {
if (!strcmp(name, "on_delivery")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an strcasecmp equivalent for windows ? Otherwise tolower() or some equivalent seem appropriate prior to comparison. While I agree function arguments should be case-sensitive they haven't been in the past so doesn't this technically break the api?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are Windows alternatives, but since python is case-sensitive, and we haven't made any claims that on_delivery, et.al, may be specified in non lower-case, I think it is safe to remove this "feature".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're good with it I'm good with it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I'm good with it

if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
Expand Down Expand Up @@ -1486,7 +1486,7 @@ static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_commit")) {
if (!strcmp(name, "on_commit")) {
if (!PyCallable_Check(valobj)) {
cfl_PyErr_Format(
RD_KAFKA_RESP_ERR__INVALID_ARG,
Expand Down
13 changes: 13 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,21 @@
#include <structmember.h>
#include <pythread.h>

#include <stdint.h>
#include <librdkafka/rdkafka.h>

#ifdef _MSC_VER
/* Windows */
#define CFL_PRId64 "I64d"
#define CFL_PRId32 "I32d"

#else
/* C99 */
#include <inttypes.h>
#define CFL_PRId64 PRId64
#define CFL_PRId32 PRId32
#endif


/**
* Minimum required librdkafka version. This is checked both during
Expand Down
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from setuptools import setup, find_packages
from distutils.core import Extension
import sys
import platform

INSTALL_REQUIRES = list()

Expand All @@ -13,8 +14,15 @@
else:
avro = 'avro-python3'

# On Un*x the library is linked as -lrdkafka,
# while on windows we need the full librdkafka name.
if platform.system() == 'Windows':
librdkafka_libname = 'librdkafka'
else:
librdkafka_libname = 'rdkafka'

module = Extension('confluent_kafka.cimpl',
libraries=['rdkafka'],
libraries=[librdkafka_libname],
sources=['confluent_kafka/src/confluent_kafka.c',
'confluent_kafka/src/Producer.c',
'confluent_kafka/src/Consumer.c',
Expand Down
18 changes: 17 additions & 1 deletion tools/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,23 @@ replace as necessary with your version or remove `rc..` suffix for the
final release.


## 1. Update OpenSSL version if necessary
## 1. Update librdkafka and OpenSSL versions

### 1.1 Update librdkafka version

Change to the latest librdkafka version in the following files:

* `.travis.yml`
* `.appveyor.yml` - this is the librdkafka.redist NuGet version,
make sure to strip the leading "v" from the version.
E.g., `0.11.6` rather than `v0.11.6`

Commit this change:

$ git commit -m "librdkafka version bump to v0.11.6" .travis.yml .appveyor.yml


### 1.2 Update OpenSSL version if necessary

As of v0.11.4 OpenSSL is packaged with the python client. It's important
that the OpenSSL version is kept up to date with the latest release.
Expand Down
Loading