Skip to content

Ensure plugin path handled prior to plugin configs #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1529,8 +1529,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
* When both args and kwargs are present the kwargs take
* precedence in case of duplicate keys.
* All keys map to configuration properties.
*
* Copy configuration dict to avoid manipulating application config.
*/
if (args) {
if (args && PyTuple_Size(args)) {
if (!PyTuple_Check(args) ||
PyTuple_Size(args) > 1) {
PyErr_SetString(PyExc_TypeError,
Expand All @@ -1542,6 +1544,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"expected configuration dict");
return NULL;
}
confdict = PyDict_Copy(confdict);
}

if (!confdict) {
Expand All @@ -1551,7 +1554,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
return NULL;
}

confdict = kwargs;
confdict = PyDict_Copy(kwargs);

} else if (kwargs) {
/* Update confdict with kwargs */
Expand All @@ -1568,6 +1571,50 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
/* Enable valid offsets in delivery reports */
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);

/*
* Plugins must be configured prior to handling any of their configuration properties.
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here.
* This ensures plugin configuration properties are handled in the correct order.
*/
if ((vo = PyDict_GetItemString(confdict, "plugin.library.paths"))) {
const char *v;
char errstr[256];
PyObject *vs = NULL, *vs8 = NULL;

if (!(vs = cfl_PyObject_Unistr(vo))) {
PyErr_SetString(PyExc_TypeError,
"expected configuration property name "
"as type unicode string");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

return NULL;
}

v = cfl_PyUnistr_AsUTF8(vs, &vs8);

if (rd_kafka_conf_set(conf, "plugin.library.paths", v, errstr, sizeof(errstr))
!= RD_KAFKA_CONF_OK) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s", errstr);

rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Py_XDECREF(vs8);
Py_XDECREF(vs);

return NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check the other returns below

}

Py_XDECREF(vs8);
Py_DECREF(vs);

PyDict_DelItemString(confdict, "plugin.library.paths");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh, come to think of it, we shouldnt mutate the passed dict.
So either we make a copy, or, just skip this DelItemString, which will make plugin.library.paths be applied twice but I believe that librdkafka will avoid re-loading a previously loaded plugin so it should work.

}

/* Convert config dict to config key-value pairs. */
while (PyDict_Next(confdict, &pos, &ko, &vo)) {
PyObject *ks, *ks8;
Expand All @@ -1583,6 +1630,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"as type unicode string");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

return NULL;
}

Expand All @@ -1592,6 +1641,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);
return NULL;
}
Py_XDECREF(ks8);
Expand All @@ -1605,8 +1655,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Py_XDECREF(ks8);
Py_DECREF(ks);

return NULL;
}
if (h->error_cb) {
Expand All @@ -1627,8 +1680,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Py_XDECREF(ks8);
Py_DECREF(ks);

return NULL;
}
if (h->throttle_cb) {
Expand All @@ -1649,8 +1705,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Py_XDECREF(ks8);
Py_DECREF(ks);

return NULL;
}

Expand Down Expand Up @@ -1691,6 +1750,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

return NULL;

} else if (r == 1) {
Expand All @@ -1712,8 +1773,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"unicode string");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Py_XDECREF(ks8);
Py_DECREF(ks);

return NULL;
}
v = cfl_PyUnistr_AsUTF8(vs, &vs8);
Expand All @@ -1725,10 +1789,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"%s", errstr);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Py_XDECREF(vs8);
Py_XDECREF(vs);
Py_XDECREF(ks8);
Py_DECREF(ks);

return NULL;
}

Expand All @@ -1738,6 +1805,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
Py_DECREF(ks);
}

Py_DECREF(confdict);

if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);

Expand Down
21 changes: 21 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import confluent_kafka
import json
import pytest
import os


def test_version():
Expand Down Expand Up @@ -123,3 +124,23 @@ def test_throttle_event_types():
assert isinstance(throttle_event.broker_id, int) and throttle_event.broker_id == 0
assert isinstance(throttle_event.throttle_time, float) and throttle_event.throttle_time == 10.0
assert str(throttle_event) == "broker/0 throttled for 10000 ms"


@pytest.mark.skipif(len([True for x in (".so", ".dylib", ".dll")
if os.path.exists("monitoring-interceptor" + x)]) == 0,
reason="requires confluent-librdkafka-plugins be installed and copied to the current directory")
@pytest.mark.parametrize("init_func", [
confluent_kafka.Consumer,
confluent_kafka.Producer,
confluent_kafka.admin.AdminClient,
])
def test_unordered_dict(init_func):
"""
Interceptor configs can only be handled after the plugin has been loaded not before.
"""
init_func({'confluent.monitoring.interceptor.publishMs': 1000,
'confluent.monitoring.interceptor.sessionDurationMs': 1000,
'plugin.library.paths': 'monitoring-interceptor',
'confluent.monitoring.interceptor.topic': 'confluent-kafka-testing',
'confluent.monitoring.interceptor.icdebug': False
})