Skip to content

Topic Admin API, list_topics(), Python 3.7, fixes #382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jun 7, 2018
Merged

Conversation

edenhill
Copy link
Contributor

For an example what the admin api looks like in practice, see examples/adminapi.py

.travis.yml Outdated
- os: linux
language: python
dist: trusty
python: "2.7"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.9.5
# Source package verification with Python 3.5 and librdkafka master
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=adminapi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assume you didn't mean to check these changes in?

except Exception as e:
# Request-level exception, raise the same for all topics
for topic, fut in futmap.items():
fut.set_exception(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍if topic-level results could potentially complete at different times, i think what you're doing here is definitely the right (only?) way to report request level errors and since your api is written for that possibility, this seems good for me.

something i'm wondering about elsewhere is whether or not this is the best approach for an API that only has a request level future (and thinking yes, even in that scenario this is an optimal compromise).

import sys


def example_create_topics(a, topics):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the examples are all synchronous - it would be good to see how this works where you're not blocking on the results to the admin call (for my own Python education as much as anything).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a contrived delta_alter_configs example

.travis.yml Outdated
- os: linux
language: python
dist: trusty
python: "2.7"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=master
# Source package verification with Python 2.7 and librdkafka v0.9.5
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=adminapi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough about Travis CI to say definitively but these changes seem like they should stay on the adminapi branch and not be merged to master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but before the librdkafka adminapi branch is merged into master this is what's needed.


import concurrent.futures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At 600 or so LOC does it not make sense to split this into its own file within the confluent-kafka directory? i.e. admin.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, another question if we want to the admin sub-namespace as in the Java API.
confluent_kafka.admin.AdminClient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also okay with this, cramming it all into init.py just seems a bit messy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a sub package seems better here to me, and also consistent with other bindings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so confluent_kafka.admin.?

raise


def example_alter_configs(a, args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to include an example using the ConfigResource returned from describe_configs given the limitations imposed by Broker versions < 2.0?

:ivar orig_broker_id: The broker this metadata originated from.
:ivar orig_broker_name: Broker name/address this metadata originated from.

This class is typically not user instantiated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is inconsistent with ConfigEntry which places the disclaimer above the instance variables. I personally feel placing the disclaimer first is the right move.

https://github.com/confluentinc/confluent-kafka-python/blob/adminapi/confluent_kafka/__init__.py#L43

Note: ConfigEntry also includes this message below the constructor declaration as well.

https://github.com/confluentinc/confluent-kafka-python/blob/adminapi/confluent_kafka/__init__.py#L86

:ivar host: Broker hostname.
:ivar port: Broker port.

This class is typically not user instantiated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we should be consistent about where we document this

:ivar partitions: Map of partitions indexed by partition id. Value is PartitionMetadata object.
:ivar error: Topic error, or None. Value is a KafkaError object.

This class is typically not user instantiated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above about disclaimer placement

in ClusterMetadata.brokers. Always check the availability
of a broker id in the brokers dict.

This class is typically not user instantiated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

return "%s=\"%s\"" % (self.name, self.value)

@classmethod
def config_source_to_str(cls, source):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a bit clunky - can you use an IntEnum instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not available in Py2.7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 3.4 implementation has been backported and i'm lead to believe commonly used in 2.7 libraries. can this not be leveraged?

https://pypi.org/project/enum34/

fs = a.describe_configs(resources)

# Wait for operation to finish.
for res, f in fs.items():
Copy link
Contributor

@mhowlett mhowlett May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this for loop is interesting because it's how you'd typically want to handle these results - iterate through them. in Java (and C# traditionally) it's easier/nicer to do this if the return type is a list rather than a map, and for that reason I think a list would be better here in those languages even though a map models the data structure more correctly. but anyway, that doesn't apply in python: a map is perfect.

and any configuration parameter not specified will be reverted to
its default value.

With alter_configs(incremental=True) (requires broker version >=2.0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the KIP or Jira number as opposed to the version number. I Don't think we can say with absolute certainty that this will make the 2.0 release just yet

without altering the configuration. Default: False
:param incremental bool: If true, only update the specified configuration
entries, not reverting unspecified configuration.
This requires broker version >=2.0. Default: False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use KIP and/or Jira reference

the provided resources with the new configuration given,
reverting all other configuration for the resource back
to their default values.
Use incremental=True to change the behaviour so that only the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use KIP and/or Jira reference

float request_timeout; /* parser: f */
float operation_timeout; /* parser: f */
int broker; /* parser: i */
int incremental; /* needs special bool parsing */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth noting version constraints here as well since its specifically mentioned in the doc string below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn't know if incremental is supported or not but relies on librdkafka to reject the option if not.


c_options = rd_kafka_AdminOptions_new(self->rk, forApi);
if (!c_options) {
PyErr_Format(PyExc_RuntimeError,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe illegal arguments should raise PyExc_ValueError

https://docs.python.org/2/library/exceptions.html#exceptions.ValueError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no user-supplied parameter here so ValueError wouldn't make sense.
This exception is only raised if the python client version is newer than librdkafka, which shouldnt happen.


if (!PyList_Check(replicas) ||
(replica_cnt = (size_t)PyList_Size(replicas)) < 1) {
PyErr_Format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise I'm fairly certain this should raise ValueError. TypeError exceptions as I understand it are for performing illegal operations against some type.

https://docs.python.org/2/library/exceptions.html#exceptions.TypeError

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just as well argue that by passing an invalid type we are performing illegal operations against the argument though I suppose. Food for thought I suppose

return NULL;

if (!PyList_Check(topics) || (tcnt = (int)PyList_Size(topics)) < 1) {
PyErr_SetString(PyExc_TypeError,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValueError?

/* Increase refcount for the return value since
* it is currently a borrowed reference. */
Py_INCREF(future);
return future;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to return the future here? The calling party should already have a reference to this future which is passed into the function to begin with. As long as the reference is incremented the future should still be safe once the original reference falls out of scope.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was from an earlier incarnation where the future was set up in C.
Changed to return None

&options.validate_only))
return NULL;

if (incremental_obj &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good way to protect against setting incremental before its supported by Kafka?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

librdkafka enforces it

@@ -5,6 +5,9 @@
Message,
Producer,
TopicPartition,
AdminClientImpl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given you don't really want AdminClientImpl exposed, I think you probably need to provide a different definition of __all__.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm submoduling AdminClient into .admin.

I think AdminClientImpl still needs to implicitly exported since AdminClient is a subclass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's the mistake there - composition is much more appropriate here I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what composition buys us in this case.
The only reason there is a Python AdminClient is because it makes it easier to wrap the single-future C AdminClientImpl API into a multiple feature Python API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about __all__ above. Instead I think i meant to suggest calling AdminClientImpl it _AdminClientImpl.

RESOURCE_BROKER: "broker"}
_res_type_by_name = {v: k for k, v in _res_name_by_type.items()}

def __init__(self, restype, name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an enum seems better to me for restype too, based on my research just now but no actual prior experience guiding me in what's normal. given it's an input rather than output thing though, not as important as CONFIG_SOURCE (but they should be consistent).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doin' it

f, futmap = AdminClient._make_futures(topics, None,
AdminClient._make_topics_result)

super(AdminClient, self).delete_topics(topics, f, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overriding methods like this seems confusing, better to use composition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an OO guy, as you are very well aware of. So please educate me what the practical difference is between composition and inheritance in this case?

Kafka Admin client: create, view, alter, delete topics and resources.
"""
from ..cimpl import (KafkaException, # noqa
AdminClientImpl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to call this _AdminClientImpl?


return f, futmap

def create_topics(self, new_topics, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing I was noting about this before is that the signatures of the methods create_topics etc. change in the subclass. The new methods conveniently hide the old ones because of **kwargs, but it seems like kind of a trick (and my python's not strong enough to know if there are any downsides) and i'm a bit averse to tricks. Generally, the main disadvantage of inheritance is that coupling is stronger - everything exposed in the base classes flow through to subclasses. Except for the hiding noted above, this would apply here. The other thing that threw me was use of the suffix Impl in the super class because it implies to me more of a complete implementation, and I think a bit unusual in this context.

That said, I agree an inheritance relationship would be ok here. My complaint about signature differences + exposing of adminclientimpl details would be mitigated by use of Pythons poor excuse for protected methods - maybe change AdminClientImpl's create_topics etc. to _create_topics_impl etc. Though, the public interface is worse in that they are visible outside the class (though by convention ignored). Also maybe change the Impl suffix to Base. I don't particularly like that either, but it's more conventional.

Alternatively, I still think composition is better here. I'd be somewhat better externally. There'd just be one externally visible property _impl of type _AdminClientImpl. And I like the name _AdminClientImpl in this context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants