Skip to content

Generic Serdes API with Avro #787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ dl-*
.pytest_cache
staging
tests/docker/conf/tls/*
.idea
.python-version
.DS_Store
.idea
tmp-KafkaCluster
63 changes: 32 additions & 31 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,14 @@ matrix:
# Source package verification with Python 2.7
- os: osx
python: "2.7"
env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib"
before_install:
- pip install -U pip && pip install virtualenv
- pyenv install -f 2.7.15
- pip install virtualenv
- virtualenv -p ~/.pyenv/versions/2.7.15/bin/python ./env
- source env/bin/activate
env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" INTERPRETER_VERSION="2.7.17"
# Source package verification with Python 3.6
- os: osx
python: "3.6"
env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" MK_DOCS="y" EXTRA_PKGS="sphinx sphinx_rtd_theme avro-python3"
before_install:
- pip install -U pip && pip install virtualenv
- pyenv install -f 3.6.5
- pip install virtualenv
- virtualenv -p ~/.pyenv/versions/3.6.5/bin/python ./env
- source env/bin/activate
env: DYLD_LIBRARY_PATH="$PWD/tmp-build/lib" MK_DOCS="y" INTERPRETER_VERSION="3.6.5"
# cibuildwheel for osx
- os: osx
env: CIBW_BEFORE_BUILD="tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp" CFLAGS="-Itmp/include" LDFLAGS="-Ltmp/lib"
before_install:
- pip install virtualenv
env: CIBW_BEFORE_BUILD="tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp" CFLAGS="-Itmp/include" LDFLAGS="-Ltmp/lib" INTERPRETER_VERSION="2.7.17"
# cibuildwheel for manylinux
- os: linux
dist: trusty
Expand All @@ -49,30 +35,45 @@ matrix:
- PYTHON_CONFIGURE_OPTS="--enable-unicode=ucs4 --with-wide-unicode"
- CIBW_MANYLINUX_X86_64_IMAGE="manylinux1"
- CIBW_MANYLINUX_I686_IMAGE="manylinux1
- INTERPRETER_VERSION="2.7.17"
language: python
python: "2.7"
services: docker

# See https://cibuildwheel.readthedocs.io/en/latest/options/ for CIBW* vars

# Install test dependencies unconditionally
# Travis OSX envs requires some setup; see tools/prepare-osx.sh
# Install cibuildwheel if this is a tagged PR
before_install:
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then tools/prepare-osx.sh ${INTERPRETER_VERSION} /tmp/venv && source /tmp/venv/bin/activate; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi

# Install interceptors
# Install librdkafka if not CIBW_BEFORE_BUILD
# Install confluent_kafka[avro] if not CIBW_BEFORE_BUILD
install:
- tools/install-interceptors.sh
- pip install -U pip && pip install virtualenv
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 trivup; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi
- if [[ ! -z $EXTRA_PKGS ]]; then pip install $(echo $EXTRA_PKGS) ; fi
- pip install -r tests/requirements.txt
- flake8
- if [[ $MK_DOCS == y ]]; then pip install -r docs/requirements.txt; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build && pip install --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro]; fi

# Build wheels
script:
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
# Make plugins available for tests
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then cibuildwheel --output-dir wheelhouse1 && tools/fixup-wheels.sh wheelhouse1 wheelhouse; fi

# Make plugins available for tests
# Execute tests if not CIBW_BEFORE_BUILD [osx, linux]
# Execute integration tests if CIBW_BEFORE_BUILD
# Build docs if MK_DOCS
after_script:
- ldd staging/libs/* || otool -L staging/libs/* || true
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "osx" ]]; then DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:staging/libs py.test -v --timeout=60 --ignore=tmp-build --import-mode append ; fi
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "linux" ]]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:staging/libs py.test -v --timeout=60 --ignore=tmp-build --import-mode append ; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then cibuildwheel --output-dir wheelhouse1 && tools/fixup-wheels.sh wheelhouse1 wheelhouse ; fi
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == linux && -n $CIBW_BEFORE_BUILD ]]; then tools/test-manylinux.sh ; fi
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "osx" ]]; then DYLD_LIBRARY_PATH=$DYLD_LIBRARY_PATH:staging/libs py.test --timeout=60 --ignore=tmp-build --import-mode append; fi
- if [[ -z $CIBW_BEFORE_BUILD && $TRAVIS_OS_NAME == "linux" ]]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:staging/libs py.test --timeout=60 --ignore=tmp-build --import-mode append; fi
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == linux && -n $CIBW_BEFORE_BUILD ]];then tools/test-manylinux.sh; fi
- if [[ -n $TRAVIS_TAG && $TRAVIS_OS_NAME == osx && -n $CIBW_BEFORE_BUILD ]]; then tools/test-osx.sh; fi
- if [[ $MK_DOCS == y ]]; then make docs ; fi
- if [[ $MK_DOCS == y ]]; then make docs; fi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this file has a lot of unnecessary cosmetic changes I strongly recommend you create a tag and generate wheels and upload to testing today, Friday, so we are not bit by surprises on release next week.


deploy:
provider: s3
Expand Down
41 changes: 35 additions & 6 deletions confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest']
from .cimpl import (Consumer, # noqa
#!/usr/bin/env python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this file need to be executable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining the interpreter is good practice, it is not the same thing as being executable (file mode).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference? why? seems of such marginal benefit it's better no to have it. don't understand why it would be best practice. I just looked at the code of the 3 of the most popular libraries for python, and none of them do this.

# -*- coding: utf-8 -*-
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg this is a quirky language

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all of this file is ascii-7 there is no need for coding here, but it doesn't hurt.

#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from .deserializing_consumer import DeserializingConsumer
from .serializing_producer import SerializingProducer

from .cimpl import (Producer,
Consumer,
KafkaError,
KafkaException,
Message,
Producer,
TopicPartition,
libversion,
version,
Expand All @@ -15,10 +35,18 @@
OFFSET_STORED,
OFFSET_INVALID)

__all__ = ['admin', 'AvroSerializer', 'Consumer',
'KafkaError', 'KafkaException',
'kafkatest', 'libversion', 'Message',
'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED',
'Producer', 'DeserializingConsumer',
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIMESTAMP_CREATE_TIME etc are enum like - it seems they would be better in an enum than as constant vlues (also we want consistency on how enums are done in this lib..).

OFFSET_BEGINNING et al. make more sense as constants as you have them. In .NET I actually have an Offset class which encapsulates these constants and the value, and this works very nicely, but I'm pretty happy with just using constants as you are here in Python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, as they are not directly related to this PR they will be addressed separately. I have taken note to revisit this amongst a few other things we discovered along the way. It's worth noting that enums are slightly complicated so long as we continue to support 2.7. We do expose them in the admin api with the usage of enum34 however that has some compatibility issues. One off enums are also a bit annoying because modern interpreters have first class support for them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition']

__version__ = version()[0]


class ThrottleEvent (object):
class ThrottleEvent(object):
"""
ThrottleEvent contains details about a throttled request.
Set up a throttle callback by setting the ``throttle_cb`` configuration
Expand All @@ -32,16 +60,17 @@ class ThrottleEvent (object):
:ivar int broker_id: The broker id
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
"""

def __init__(self, broker_name,
broker_id,
throttle_time):

self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time = throttle_time

def __str__(self):
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id, int(self.throttle_time * 1000))
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
int(self.throttle_time * 1000))


def _resolve_plugins(plugins):
Expand Down
17 changes: 17 additions & 0 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#!/usr/bin/env python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, don't understand why this is executable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair it doesn't make this file executable. It does however inform the reader, and their text editor, of exactly what this file is though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaict no one else important does this, and it's bordering on pointless. it didn't help me in any way having this here.

#
# Copyright 2016-2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas

Expand Down
4 changes: 4 additions & 0 deletions confluent_kafka/avro/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fastavro
requests
avro==1.9.2;python_version<='3.0'
avro-python3==1.9.2.1;python_version>='3.0'
158 changes: 158 additions & 0 deletions confluent_kafka/deserializing_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from confluent_kafka.cimpl import (KafkaError,
Consumer as _ConsumerImpl)
from .error import ConsumeError
from .serialization import (SerializationError,
SerializationContext,
MessageField)


class DeserializingConsumer(_ConsumerImpl):
"""
A client that consumes records from a Kafka cluster. With deserialization
capabilities.

Note:

The DeserializingConsumer is an experimental API and subject to change.

.. versionadded:: 1.4.0

The ``key.deserializer`` and ``value.deserializer`` classes instruct the
DeserializingConsumer on how to convert the message payload bytes to objects.

Note:

All configured callbacks are served from the application queue upon
calling :py:func:`DeserializingConsumer.poll`

DeserializingConsumer configuration properties(* indicates required field)

+--------------------+-----------------+-----------------------------------------------------+
| Property Name | Type | Description |
+====================+=================+=====================================================+
| bootstrap.servers* | str | Comma-separated list of brokers. |
+--------------------+-----------------+-----------------------------------------------------+
| | | Client group id string. |
| group.id* | str | All clients sharing the same group.id belong to the |
| | | same group. |
+--------------------+-----------------+-----------------------------------------------------+
| | | Callable(SerializationContext, bytes) -> obj |
| key.deserializer | callable | |
| | | Deserializer used for message keys. |
+--------------------+-----------------+-----------------------------------------------------+
| | | Callable(SerializationContext, bytes) -> obj |
| value.deserializer | callable | |
| | | Deserializer used for message values. |
+--------------------+-----------------+-----------------------------------------------------+
| | | Callable(KafkaError) |
| | | |
| error_cb | callable | Callback for generic/global error events. These |
| | | errors are typically to be considered informational |
| | | since the client will automatically try to recover. |
+--------------------+-----------------+-----------------------------------------------------+
| log_cb | logging.Handler | Logging handler to forward logs |
+--------------------+-----------------+-----------------------------------------------------+
| | | Callable(str) |
| | | |
| | | Callback for statistics. This callback is |
| stats_cb | callable | added to the application queue every |
| | | ``statistics.interval.ms`` (configured separately). |
| | | The function argument is a JSON formatted str |
| | | containing statistics data. |
+--------------------+-----------------+-----------------------------------------------------+
| | | Callable(ThrottleEvent) |
| throttle_cb | callable | |
| | | Callback for throttled request reporting. |
+--------------------+-----------------+-----------------------------------------------------+

.. _See Client CONFIGURATION.md for a complete list of configuration properties:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Args:
conf (dict): DeserializingConsumer configuration.

Raises:
ValueError: if configuration validation fails

.. _Statistics:
https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

"""

def __init__(self, conf):
conf_copy = conf.copy()
self._key_deserializer = conf_copy.pop('key.deserializer', None)
self._value_deserializer = conf_copy.pop('value.deserializer', None)

super(DeserializingConsumer, self).__init__(conf_copy)

def poll(self, timeout=-1):
"""
Consume messages and calls callbacks.

Args:
timeout (float): Maximum time to block waiting for message(Seconds).

Returns:
:py:class:`Message` or None on timeout

Raises:
ConsumeError if an error was encountered while polling.

"""
msg = super(DeserializingConsumer, self).poll(timeout)

if msg is None:
return None

if msg.error() is not None:
raise ConsumeError(msg.error(), message=msg)

ctx = SerializationContext(msg.topic(), MessageField.VALUE)
value = msg.value()
if self._value_deserializer is not None:
try:
value = self._value_deserializer(ctx, value)
except SerializationError as se:
raise ConsumeError(KafkaError._VALUE_DESERIALIZATION,
reason=se.message,
message=msg)

key = msg.key()
ctx.field = MessageField.KEY
if self._key_deserializer is not None:
try:
key = self._key_deserializer(ctx, key)
except SerializationError as se:
raise ConsumeError(KafkaError._KEY_DESERIALIZATION,
reason=se.message,
message=msg)

msg.set_key(key)
msg.set_value(value)
return msg

def consume(self, num_messages=1, timeout=-1):
"""
:py:func:`Consumer.consume` not implemented,
:py:func:`DeserializingConsumer.poll` instead
"""
raise NotImplementedError
Loading