-
Notifications
You must be signed in to change notification settings - Fork 915
Topic Admin API, list_topics(), Python 3.7, fixes #382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* Add 'get_metadata' to Producer and Consumer. * Expose metadata in a way similar to the java API
Added tests for list_topics() This also removes the rich broker objects at each reference point (such as leader, isrs, etc), since it is possible that the broker does not exist. Instead we ask the user to check that the broker id exists in ClusterMetadata.brokers before referencing.
.. and a bool refactor.
…TF8) .. and a bunch of header test fixes.
.travis.yml
Outdated
- os: linux | ||
language: python | ||
dist: trusty | ||
python: "2.7" | ||
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.9.5 | ||
# Source package verification with Python 3.5 and librdkafka master | ||
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=adminapi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assume you didn't mean to check these changes in?
confluent_kafka/__init__.py
Outdated
except Exception as e: | ||
# Request-level exception, raise the same for all topics | ||
for topic, fut in futmap.items(): | ||
fut.set_exception(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍if topic-level results could potentially complete at different times, i think what you're doing here is definitely the right (only?) way to report request level errors and since your api is written for that possibility, this seems good for me.
something i'm wondering about elsewhere is whether or not this is the best approach for an API that only has a request level future (and thinking yes, even in that scenario this is an optimal compromise).
examples/adminapi.py
Outdated
import sys | ||
|
||
|
||
def example_create_topics(a, topics): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the examples are all synchronous - it would be good to see how this works where you're not blocking on the results to the admin call (for my own Python education as much as anything).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a contrived delta_alter_configs example
.travis.yml
Outdated
- os: linux | ||
language: python | ||
dist: trusty | ||
python: "2.7" | ||
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=master | ||
# Source package verification with Python 2.7 and librdkafka v0.9.5 | ||
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=adminapi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know enough about Travis CI to say definitively but these changes seem like they should stay on the adminapi branch and not be merged to master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but before the librdkafka adminapi branch is merged into master this is what's needed.
confluent_kafka/__init__.py
Outdated
|
||
import concurrent.futures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At 600 or so LOC does it not make sense to split this into its own file within the confluent-kafka directory? i.e. admin.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, another question if we want to the admin sub-namespace as in the Java API.
confluent_kafka.admin.AdminClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also okay with this, cramming it all into init.py just seems a bit messy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a sub package seems better here to me, and also consistent with other bindings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so confluent_kafka.admin.
?
raise | ||
|
||
|
||
def example_alter_configs(a, args): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to include an example using the ConfigResource returned from describe_configs given the limitations imposed by Broker versions < 2.0?
confluent_kafka/__init__.py
Outdated
:ivar orig_broker_id: The broker this metadata originated from. | ||
:ivar orig_broker_name: Broker name/address this metadata originated from. | ||
|
||
This class is typically not user instantiated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is inconsistent with ConfigEntry which places the disclaimer above the instance variables. I personally feel placing the disclaimer first is the right move.
https://github.com/confluentinc/confluent-kafka-python/blob/adminapi/confluent_kafka/__init__.py#L43
Note: ConfigEntry also includes this message below the constructor declaration as well.
https://github.com/confluentinc/confluent-kafka-python/blob/adminapi/confluent_kafka/__init__.py#L86
confluent_kafka/__init__.py
Outdated
:ivar host: Broker hostname. | ||
:ivar port: Broker port. | ||
|
||
This class is typically not user instantiated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, we should be consistent about where we document this
confluent_kafka/__init__.py
Outdated
:ivar partitions: Map of partitions indexed by partition id. Value is PartitionMetadata object. | ||
:ivar error: Topic error, or None. Value is a KafkaError object. | ||
|
||
This class is typically not user instantiated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above about disclaimer placement
confluent_kafka/__init__.py
Outdated
in ClusterMetadata.brokers. Always check the availability | ||
of a broker id in the brokers dict. | ||
|
||
This class is typically not user instantiated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
confluent_kafka/__init__.py
Outdated
return "%s=\"%s\"" % (self.name, self.value) | ||
|
||
@classmethod | ||
def config_source_to_str(cls, source): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a bit clunky - can you use an IntEnum
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not available in Py2.7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the 3.4 implementation has been backported and i'm lead to believe commonly used in 2.7 libraries. can this not be leveraged?
fs = a.describe_configs(resources) | ||
|
||
# Wait for operation to finish. | ||
for res, f in fs.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this for loop is interesting because it's how you'd typically want to handle these results - iterate through them. in Java (and C# traditionally) it's easier/nicer to do this if the return type is a list rather than a map, and for that reason I think a list would be better here in those languages even though a map models the data structure more correctly. but anyway, that doesn't apply in python: a map is perfect.
confluent_kafka/__init__.py
Outdated
and any configuration parameter not specified will be reverted to | ||
its default value. | ||
|
||
With alter_configs(incremental=True) (requires broker version >=2.0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the KIP or Jira number as opposed to the version number. I Don't think we can say with absolute certainty that this will make the 2.0 release just yet
confluent_kafka/__init__.py
Outdated
without altering the configuration. Default: False | ||
:param incremental bool: If true, only update the specified configuration | ||
entries, not reverting unspecified configuration. | ||
This requires broker version >=2.0. Default: False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use KIP and/or Jira reference
confluent_kafka/__init__.py
Outdated
the provided resources with the new configuration given, | ||
reverting all other configuration for the resource back | ||
to their default values. | ||
Use incremental=True to change the behaviour so that only the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use KIP and/or Jira reference
float request_timeout; /* parser: f */ | ||
float operation_timeout; /* parser: f */ | ||
int broker; /* parser: i */ | ||
int incremental; /* needs special bool parsing */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth noting version constraints here as well since its specifically mentioned in the doc string below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code doesn't know if incremental is supported or not but relies on librdkafka to reject the option if not.
confluent_kafka/src/Admin.c
Outdated
|
||
c_options = rd_kafka_AdminOptions_new(self->rk, forApi); | ||
if (!c_options) { | ||
PyErr_Format(PyExc_RuntimeError, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe illegal arguments should raise PyExc_ValueError
https://docs.python.org/2/library/exceptions.html#exceptions.ValueError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no user-supplied parameter here so ValueError wouldn't make sense.
This exception is only raised if the python client version is newer than librdkafka, which shouldnt happen.
|
||
if (!PyList_Check(replicas) || | ||
(replica_cnt = (size_t)PyList_Size(replicas)) < 1) { | ||
PyErr_Format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise I'm fairly certain this should raise ValueError. TypeError exceptions as I understand it are for performing illegal operations against some type.
https://docs.python.org/2/library/exceptions.html#exceptions.TypeError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could just as well argue that by passing an invalid type we are performing illegal operations against the argument though I suppose. Food for thought I suppose
confluent_kafka/src/Admin.c
Outdated
return NULL; | ||
|
||
if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) { | ||
PyErr_SetString(PyExc_TypeError, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ValueError?
confluent_kafka/src/Admin.c
Outdated
/* Increase refcount for the return value since | ||
* it is currently a borrowed reference. */ | ||
Py_INCREF(future); | ||
return future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to return the future here? The calling party should already have a reference to this future which is passed into the function to begin with. As long as the reference is incremented the future should still be safe once the original reference falls out of scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was from an earlier incarnation where the future was set up in C.
Changed to return None
&options.validate_only)) | ||
return NULL; | ||
|
||
if (incremental_obj && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good way to protect against setting incremental before its supported by Kafka?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
librdkafka enforces it
confluent_kafka/__init__.py
Outdated
@@ -5,6 +5,9 @@ | |||
Message, | |||
Producer, | |||
TopicPartition, | |||
AdminClientImpl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given you don't really want AdminClientImpl
exposed, I think you probably need to provide a different definition of __all__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm submoduling AdminClient into .admin.
I think AdminClientImpl still needs to implicitly exported since AdminClient is a subclass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's the mistake there - composition is much more appropriate here I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what composition buys us in this case.
The only reason there is a Python AdminClient is because it makes it easier to wrap the single-future C AdminClientImpl API into a multiple feature Python API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused about __all__
above. Instead I think i meant to suggest calling AdminClientImpl
it _AdminClientImpl
.
confluent_kafka/__init__.py
Outdated
RESOURCE_BROKER: "broker"} | ||
_res_type_by_name = {v: k for k, v in _res_name_by_type.items()} | ||
|
||
def __init__(self, restype, name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an enum seems better to me for restype
too, based on my research just now but no actual prior experience guiding me in what's normal. given it's an input rather than output thing though, not as important as CONFIG_SOURCE
(but they should be consistent).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doin' it
confluent_kafka/__init__.py
Outdated
f, futmap = AdminClient._make_futures(topics, None, | ||
AdminClient._make_topics_result) | ||
|
||
super(AdminClient, self).delete_topics(topics, f, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overriding methods like this seems confusing, better to use composition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not an OO guy, as you are very well aware of. So please educate me what the practical difference is between composition and inheritance in this case?
confluent_kafka/admin/__init__.py
Outdated
Kafka Admin client: create, view, alter, delete topics and resources. | ||
""" | ||
from ..cimpl import (KafkaException, # noqa | ||
AdminClientImpl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to call this _AdminClientImpl
?
|
||
return f, futmap | ||
|
||
def create_topics(self, new_topics, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main thing I was noting about this before is that the signatures of the methods create_topics
etc. change in the subclass. The new methods conveniently hide the old ones because of **kwargs
, but it seems like kind of a trick (and my python's not strong enough to know if there are any downsides) and i'm a bit averse to tricks. Generally, the main disadvantage of inheritance is that coupling is stronger - everything exposed in the base classes flow through to subclasses. Except for the hiding noted above, this would apply here. The other thing that threw me was use of the suffix Impl
in the super class because it implies to me more of a complete implementation, and I think a bit unusual in this context.
That said, I agree an inheritance relationship would be ok here. My complaint about signature differences + exposing of adminclientimpl details would be mitigated by use of Pythons poor excuse for protected methods - maybe change AdminClientImpl
's create_topics
etc. to _create_topics_impl
etc. Though, the public interface is worse in that they are visible outside the class (though by convention ignored). Also maybe change the Impl
suffix to Base
. I don't particularly like that either, but it's more conventional.
Alternatively, I still think composition is better here. I'd be somewhat better externally. There'd just be one externally visible property _impl
of type _AdminClientImpl
. And I like the name _AdminClientImpl
in this context.
we're no longer backwards compatible with older librdkafka
For an example what the admin api looks like in practice, see examples/adminapi.py