Skip to content

Ensure plugin path handled prior to plugin configs #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 21, 2018
Merged

Ensure plugin path handled prior to plugin configs #404

merged 2 commits into from
Jun 21, 2018

Conversation

rnpridgeon
Copy link
Contributor

No description provided.

@rnpridgeon rnpridgeon requested a review from edenhill June 18, 2018 15:32
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff, some comments!


v = cfl_PyUnistr_AsUTF8(vs, &vs8);

if(rd_kafka_conf_set(conf, "plugin.library.paths", v, NULL, 0) != RD_KAFKA_CONF_OK) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (

keep under 80 columns wide



@pytest.skipif(len([True for x in (".so", ".dylib", ".dll") if os.path.exists("monitoring-interceptor" + x)]) > 0,
reason="requires confluent-librdkafka-plugins to be installed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and copied to the current directory

"""
Consumer({'confluent.monitoring.interceptor.publishMs': 1000,
'confluent.monitoring.interceptor.sessionDurationMs': 1000,
'plugin.library.paths': 'monitoring-interceptor.dylib',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip the file extension, handled by the librd of kafka

@@ -191,3 +192,17 @@ def test_set_invalid_partitioner_murmur():
with pytest.raises(KafkaException) as e:
Producer({'partitioner': 'murmur'})
assert 'Invalid value for configuration property "partitioner": murmur' in str(e)


@pytest.skipif(len([True for x in (".so", ".dylib", ".dll") if os.path.exists("monitoring-interceptor" + x)]) > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating this test, move it to a common file (e.g., test_misc) and parameterize the client types: producer, consumer, admin

@@ -1568,6 +1568,40 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
/* Enable valid offsets in delivery reports */
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);

/* Preload Plugins since dicts are unordered */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preloading a library/plugin is something else, I think you can instead flesh out the comment a bit, something like:
Plugins must be configured prior to any configuration properties they support, since dicts are unordered
we explicitly check for, set, and delete the plugin paths here to make sure they are applied prior to
their configuration.

@@ -1568,7 +1568,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
/* Enable valid offsets in delivery reports */
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);

/* Preload Plugins since dicts are unordered */
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent seems off, /* should probably be one space to the right

/* Preload Plugins since dicts are unordered */
/*
* Plugins must be configured prior to handling any of their configuration properties
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end sentence with period.

/*
* Plugins must be configured prior to handling any of their configuration properties
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here
* This ensures all configuration properties are handled in the right order
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end sentence with period.

if (rd_kafka_conf_set(conf, "plugin.library.paths", v, NULL, 0)
!= RD_KAFKA_CONF_OK) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s", NULL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"%s", NULL will raise an exception with the message "(null)".
Instead pass errstr to conf_set, and then use that here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sorry that was left over from when I did the initial POC

"""
Interceptor configs can only be handled after the plugin has been loaded not before.
"""
for fn in [confluent_kafka.Producer, confluent_kafka.Consumer, confluent_kafka.admin.AdminClient]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking you could use parameterization: https://docs.pytest.org/en/latest/parametrize.html

@@ -1568,6 +1568,47 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
/* Enable valid offsets in delivery reports */
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent should be 8 whitespaces, not 7 my man

if (rd_kafka_conf_set(conf, "plugin.library.paths", v, errstr, sizeof(errstr))
!= RD_KAFKA_CONF_OK) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s", NULL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass errstr instead of NULL here

Py_XDECREF(vs8);
Py_DECREF(vs);

PyDict_DelItemString(confdict, "plugin.library.paths");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh, come to think of it, we shouldnt mutate the passed dict.
So either we make a copy, or, just skip this DelItemString, which will make plugin.library.paths be applied twice but I believe that librdkafka will avoid re-loading a previously loaded plugin so it should work.

(confluent_kafka.Producer),
(confluent_kafka.admin.AdminClient),
])
def test_unordered_dict(init_func):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@rnpridgeon
Copy link
Contributor Author

@edenhill good call on mutating the application's dict. I have made the requested changes

@@ -1530,7 +1530,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
* precedence in case of duplicate keys.
* All keys map to configuration properties.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extend this comment to mention why we make a copy of the dict

/*
* Plugins must be configured prior to handling any of their configuration properties.
* Dicts are unordered so we explicitly check for, set, and delete the plugin paths here.
* This ensures all configuration properties are handled in the right order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/all/plugin/

"as type unicode string");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
return NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confdict is leaked here, do Py_DECREF

Py_XDECREF(vs8);
Py_XDECREF(vs);

return NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check the other returns below

@rnpridgeon rnpridgeon changed the title Ensure Plugins are loaded prior to Plugin configs Ensure Plugin path handled prior to Plugin configs Jun 20, 2018
@rnpridgeon
Copy link
Contributor Author

@edenhill , slightly less leaky. There are some whitespace changes but I believe they add to readability. If its a problem I can remove them .

@@ -1529,6 +1529,8 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
* When both args and kwargs are present the kwargs take
* precedence in case of duplicate keys.
* All keys map to configuration properties.
*
* Copy configuration dict to avoid manipulating application config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (but that's what you want..): How about a trailing period?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

touché

@@ -1530,7 +1530,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
* precedence in case of duplicate keys.
* All keys map to configuration properties.
*
* Copy configuration dict to avoid manipulating application config
* Copy configuration dict to avoid manipulating application config.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great fix!

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rnpridgeon rnpridgeon changed the title Ensure Plugin path handled prior to Plugin configs Ensure plugin path handled prior to plugin configs Jun 21, 2018
@rnpridgeon rnpridgeon merged commit 1937de2 into confluentinc:master Jun 21, 2018
@rnpridgeon rnpridgeon deleted the ordered_dict branch July 3, 2018 23:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants