-
Notifications
You must be signed in to change notification settings - Fork 917
Generic Serdes API with Avro #787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,5 +22,7 @@ dl-* | |
.pytest_cache | ||
staging | ||
tests/docker/conf/tls/* | ||
.idea | ||
.python-version | ||
.DS_Store | ||
.idea | ||
tmp-KafkaCluster |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,29 @@ | ||
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest'] | ||
from .cimpl import (Consumer, # noqa | ||
#!/usr/bin/env python | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this file need to be executable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defining the interpreter is good practice, it is not the same thing as being executable (file mode). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reference? why? seems of such marginal benefit it's better no to have it. don't understand why it would be best practice. I just looked at the code of the 3 of the most popular libraries for python, and none of them do this. |
||
# -*- coding: utf-8 -*- | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. omg this is a quirky language There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since all of this file is ascii-7 there is no need for coding here, but it doesn't hurt. |
||
# | ||
# Copyright 2020 Confluent Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from .deserializing_consumer import DeserializingConsumer | ||
from .serializing_producer import SerializingProducer | ||
|
||
from .cimpl import (Producer, | ||
Consumer, | ||
KafkaError, | ||
KafkaException, | ||
Message, | ||
Producer, | ||
TopicPartition, | ||
libversion, | ||
version, | ||
|
@@ -15,10 +35,18 @@ | |
OFFSET_STORED, | ||
OFFSET_INVALID) | ||
|
||
__all__ = ['admin', 'AvroSerializer', 'Consumer', | ||
'KafkaError', 'KafkaException', | ||
'kafkatest', 'libversion', 'Message', | ||
'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED', | ||
'Producer', 'DeserializingConsumer', | ||
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted, as they are not directly related to this PR they will be addressed separately. I have taken note to revisit this amongst a few other things we discovered along the way. It's worth noting that enums are slightly complicated so long as we continue to support 2.7. We do expose them in the admin api with the usage of enum34 however that has some compatibility issues. One off enums are also a bit annoying because modern interpreters have first class support for them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah |
||
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition'] | ||
|
||
__version__ = version()[0] | ||
|
||
|
||
class ThrottleEvent (object): | ||
class ThrottleEvent(object): | ||
""" | ||
ThrottleEvent contains details about a throttled request. | ||
Set up a throttle callback by setting the ``throttle_cb`` configuration | ||
|
@@ -32,16 +60,17 @@ class ThrottleEvent (object): | |
:ivar int broker_id: The broker id | ||
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request | ||
""" | ||
|
||
def __init__(self, broker_name, | ||
broker_id, | ||
throttle_time): | ||
|
||
self.broker_name = broker_name | ||
self.broker_id = broker_id | ||
self.throttle_time = throttle_time | ||
|
||
def __str__(self): | ||
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, int(self.throttle_time * 1000)) | ||
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, | ||
int(self.throttle_time * 1000)) | ||
|
||
|
||
def _resolve_plugins(plugins): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,20 @@ | ||
#!/usr/bin/env python | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. again, don't understand why this is executable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be fair it doesn't make this file executable. It does however inform the reader, and their text editor, of exactly what this file is though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. afaict no one else important does this, and it's bordering on pointless. it didn't help me in any way having this here. |
||
# | ||
# Copyright 2016-2020 Confluent Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
""" | ||
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
fastavro | ||
requests | ||
avro==1.9.2;python_version<='3.0' | ||
avro-python3==1.9.2.1;python_version>='3.0' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright 2020 Confluent Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from confluent_kafka.cimpl import (KafkaError, | ||
Consumer as _ConsumerImpl) | ||
from .error import ConsumeError | ||
from .serialization import (SerializationError, | ||
SerializationContext, | ||
MessageField) | ||
|
||
|
||
class DeserializingConsumer(_ConsumerImpl): | ||
""" | ||
A client that consumes records from a Kafka cluster. With deserialization | ||
capabilities. | ||
|
||
Note: | ||
|
||
The DeserializingConsumer is an experimental API and subject to change. | ||
|
||
.. versionadded:: 1.4.0 | ||
|
||
The ``key.deserializer`` and ``value.deserializer`` classes instruct the | ||
DeserializingConsumer on how to convert the message payload bytes to objects. | ||
|
||
Note: | ||
|
||
All configured callbacks are served from the application queue upon | ||
calling :py:func:`DeserializingConsumer.poll` | ||
|
||
DeserializingConsumer configuration properties(* indicates required field) | ||
|
||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| Property Name | Type | Description | | ||
+====================+=================+=====================================================+ | ||
| bootstrap.servers* | str | Comma-separated list of brokers. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| | | Client group id string. | | ||
| group.id* | str | All clients sharing the same group.id belong to the | | ||
| | | same group. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| | | Callable(SerializationContext, bytes) -> obj | | ||
| key.deserializer | callable | | | ||
| | | Deserializer used for message keys. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| | | Callable(SerializationContext, bytes) -> obj | | ||
| value.deserializer | callable | | | ||
| | | Deserializer used for message values. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| | | Callable(KafkaError) | | ||
| | | | | ||
| error_cb | callable | Callback for generic/global error events. These | | ||
| | | errors are typically to be considered informational | | ||
| | | since the client will automatically try to recover. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| log_cb | logging.Handler | Logging handler to forward logs | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| | | Callable(str) | | ||
| | | | | ||
| | | Callback for statistics. This callback is | | ||
| stats_cb | callable | added to the application queue every | | ||
| | | ``statistics.interval.ms`` (configured separately). | | ||
| | | The function argument is a JSON formatted str | | ||
| | | containing statistics data. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
| | | Callable(ThrottleEvent) | | ||
| throttle_cb | callable | | | ||
| | | Callback for throttled request reporting. | | ||
+--------------------+-----------------+-----------------------------------------------------+ | ||
|
||
.. _See Client CONFIGURATION.md for a complete list of configuration properties: | ||
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md | ||
|
||
Args: | ||
conf (dict): DeserializingConsumer configuration. | ||
|
||
Raises: | ||
ValueError: if configuration validation fails | ||
|
||
.. _Statistics: | ||
https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md | ||
|
||
""" | ||
|
||
def __init__(self, conf): | ||
conf_copy = conf.copy() | ||
self._key_deserializer = conf_copy.pop('key.deserializer', None) | ||
self._value_deserializer = conf_copy.pop('value.deserializer', None) | ||
|
||
super(DeserializingConsumer, self).__init__(conf_copy) | ||
|
||
def poll(self, timeout=-1): | ||
""" | ||
Consume messages and calls callbacks. | ||
|
||
Args: | ||
timeout (float): Maximum time to block waiting for message(Seconds). | ||
|
||
Returns: | ||
:py:class:`Message` or None on timeout | ||
|
||
Raises: | ||
ConsumeError if an error was encountered while polling. | ||
|
||
""" | ||
msg = super(DeserializingConsumer, self).poll(timeout) | ||
|
||
if msg is None: | ||
return None | ||
|
||
if msg.error() is not None: | ||
raise ConsumeError(msg.error(), message=msg) | ||
|
||
ctx = SerializationContext(msg.topic(), MessageField.VALUE) | ||
value = msg.value() | ||
if self._value_deserializer is not None: | ||
try: | ||
value = self._value_deserializer(ctx, value) | ||
except SerializationError as se: | ||
raise ConsumeError(KafkaError._VALUE_DESERIALIZATION, | ||
reason=se.message, | ||
message=msg) | ||
|
||
key = msg.key() | ||
ctx.field = MessageField.KEY | ||
if self._key_deserializer is not None: | ||
try: | ||
key = self._key_deserializer(ctx, key) | ||
except SerializationError as se: | ||
raise ConsumeError(KafkaError._KEY_DESERIALIZATION, | ||
reason=se.message, | ||
message=msg) | ||
|
||
msg.set_key(key) | ||
msg.set_value(value) | ||
return msg | ||
|
||
def consume(self, num_messages=1, timeout=-1): | ||
""" | ||
:py:func:`Consumer.consume` not implemented, | ||
:py:func:`DeserializingConsumer.poll` instead | ||
""" | ||
raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this file has a lot of unnecessary cosmetic changes I strongly recommend you create a tag and generate wheels and upload to testing today, Friday, so we are not bit by surprises on release next week.