-
Notifications
You must be signed in to change notification settings - Fork 915
Ensure plugin path handled prior to plugin configs #404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff, some comments!
|
||
v = cfl_PyUnistr_AsUTF8(vs, &vs8); | ||
|
||
if(rd_kafka_conf_set(conf, "plugin.library.paths", v, NULL, 0) != RD_KAFKA_CONF_OK) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (
keep under 80 columns wide
tests/test_Consumer.py
Outdated
|
||
|
||
@pytest.skipif(len([True for x in (".so", ".dylib", ".dll") if os.path.exists("monitoring-interceptor" + x)]) > 0, | ||
reason="requires confluent-librdkafka-plugins to be installed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and copied to the current directory
tests/test_Consumer.py
Outdated
""" | ||
Consumer({'confluent.monitoring.interceptor.publishMs': 1000, | ||
'confluent.monitoring.interceptor.sessionDurationMs': 1000, | ||
'plugin.library.paths': 'monitoring-interceptor.dylib', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip the file extension, handled by the librd of kafka
tests/test_Producer.py
Outdated
@@ -191,3 +192,17 @@ def test_set_invalid_partitioner_murmur(): | |||
with pytest.raises(KafkaException) as e: | |||
Producer({'partitioner': 'murmur'}) | |||
assert 'Invalid value for configuration property "partitioner": murmur' in str(e) | |||
|
|||
|
|||
@pytest.skipif(len([True for x in (".so", ".dylib", ".dll") if os.path.exists("monitoring-interceptor" + x)]) > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of duplicating this test, move it to a common file (e.g., test_misc) and parameterize the client types: producer, consumer, admin
@@ -1568,6 +1568,40 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
/* Enable valid offsets in delivery reports */ | |||
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0); | |||
|
|||
/* Preload Plugins since dicts are unordered */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preloading a library/plugin is something else, I think you can instead flesh out the comment a bit, something like:
Plugins must be configured prior to any configuration properties they support, since dicts are unordered
we explicitly check for, set, and delete the plugin paths here to make sure they are applied prior to
their configuration.
@@ -1568,7 +1568,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
/* Enable valid offsets in delivery reports */ | |||
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0); | |||
|
|||
/* Preload Plugins since dicts are unordered */ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent seems off, /* should probably be one space to the right
/* Preload Plugins since dicts are unordered */ | ||
/* | ||
* Plugins must be configured prior to handling any of their configuration properties | ||
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end sentence with period.
/* | ||
* Plugins must be configured prior to handling any of their configuration properties | ||
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here | ||
* This ensures all configuration properties are handled in the right order |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end sentence with period.
if (rd_kafka_conf_set(conf, "plugin.library.paths", v, NULL, 0) | ||
!= RD_KAFKA_CONF_OK) { | ||
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, | ||
"%s", NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"%s", NULL will raise an exception with the message "(null)".
Instead pass errstr to conf_set, and then use that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep sorry that was left over from when I did the initial POC
tests/test_misc.py
Outdated
""" | ||
Interceptor configs can only be handled after the plugin has been loaded not before. | ||
""" | ||
for fn in [confluent_kafka.Producer, confluent_kafka.Consumer, confluent_kafka.admin.AdminClient]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking you could use parameterization: https://docs.pytest.org/en/latest/parametrize.html
@@ -1568,6 +1568,47 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
/* Enable valid offsets in delivery reports */ | |||
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0); | |||
|
|||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent should be 8 whitespaces, not 7 my man
if (rd_kafka_conf_set(conf, "plugin.library.paths", v, errstr, sizeof(errstr)) | ||
!= RD_KAFKA_CONF_OK) { | ||
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, | ||
"%s", NULL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass errstr instead of NULL here
Py_XDECREF(vs8); | ||
Py_DECREF(vs); | ||
|
||
PyDict_DelItemString(confdict, "plugin.library.paths"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh, come to think of it, we shouldnt mutate the passed dict.
So either we make a copy, or, just skip this DelItemString, which will make plugin.library.paths be applied twice but I believe that librdkafka will avoid re-loading a previously loaded plugin so it should work.
tests/test_misc.py
Outdated
(confluent_kafka.Producer), | ||
(confluent_kafka.admin.AdminClient), | ||
]) | ||
def test_unordered_dict(init_func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@edenhill good call on mutating the application's dict. I have made the requested changes |
@@ -1530,7 +1530,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
* precedence in case of duplicate keys. | |||
* All keys map to configuration properties. | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extend this comment to mention why we make a copy of the dict
/* | ||
* Plugins must be configured prior to handling any of their configuration properties. | ||
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here. | ||
* This ensures all configuration properties are handled in the right order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/all/plugin/
"as type unicode string"); | ||
rd_kafka_topic_conf_destroy(tconf); | ||
rd_kafka_conf_destroy(conf); | ||
return NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confdict is leaked here, do Py_DECREF
Py_XDECREF(vs8); | ||
Py_XDECREF(vs); | ||
|
||
return NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dito
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also check the other returns below
@edenhill , slightly less leaky. There are some whitespace changes but I believe they add to readability. If its a problem I can remove them . |
@@ -1529,6 +1529,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
* When both args and kwargs are present the kwargs take | |||
* precedence in case of duplicate keys. | |||
* All keys map to configuration properties. | |||
* | |||
* Copy configuration dict to avoid manipulating application config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (but that's what you want..): How about a trailing period?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
touché
@@ -1530,7 +1530,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
* precedence in case of duplicate keys. | |||
* All keys map to configuration properties. | |||
* | |||
* Copy configuration dict to avoid manipulating application config | |||
* Copy configuration dict to avoid manipulating application config. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great fix!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
No description provided.