Skip to content

return record name for AVRO union #785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ dl-*
.pytest_cache
staging
tests/docker/conf/tls/*
.DS_Store
.idea
tmp-KafkaCluster
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ install:
- tools/install-interceptors.sh
- pip install -U pip && pip install virtualenv
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -r confluent_kafka/requirements.txt -r tests/requirements.txt -r confluent_kafka/avro/requirements.txt; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi
- if [[ ! -z $EXTRA_PKGS ]]; then pip install $(echo $EXTRA_PKGS) ; fi

script:
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[dev] ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
# Make plugins available for tests
- ldd staging/libs/* || otool -L staging/libs/* || true
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ avroProducer.flush()
**AvroConsumer**

```python
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.serialization import SerializerError


c = AvroConsumer({
Expand Down
10 changes: 6 additions & 4 deletions confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest']
from .cimpl import (Consumer, # noqa
KafkaError,
from .producer import Producer
from .consumer import Consumer

from .cimpl import (KafkaError, # noqa
KafkaException,
Message,
Producer,
TopicPartition,
libversion,
version,
Expand All @@ -15,6 +15,8 @@
OFFSET_STORED,
OFFSET_INVALID)

__all__ = ['cimpl', 'admin', 'avro', 'kafkatest', 'Producer', 'Consumer']

__version__ = version()[0]


Expand Down
193 changes: 35 additions & 158 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
@@ -1,158 +1,35 @@
"""
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas

"""

from confluent_kafka import Producer, Consumer
from confluent_kafka.avro.error import ClientError
from confluent_kafka.avro.load import load, loads # noqa
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import (SerializerError, # noqa
KeySerializerError,
ValueSerializerError)
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer


class AvroProducer(Producer):
"""
Kafka Producer client which does avro schema encoding to messages.
Handles schema registration, Message serialization.

Constructor takes below parameters.

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
:param str default_key_schema: Optional default avro schema for key
:param str default_value_schema: Optional default avro schema for value
"""

def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):

sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')
sr_conf['auto.register.schemas'] = config.get('auto.register.schemas', True)

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroProducer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry)
self._key_schema = default_key_schema
self._value_schema = default_value_schema

def produce(self, **kwargs):
"""
Asynchronously sends message to Kafka by encoding with specified or default avro schema.

:param str topic: topic name
:param object value: An object to serialize
:param str value_schema: Avro schema for value
:param object key: An object to serialize
:param str key_schema: Avro schema for key

Plus any other parameters accepted by confluent_kafka.Producer.produce

:raises SerializerError: On serialization failure
:raises BufferError: If producer queue is full.
:raises KafkaException: For other produce failures.
"""
# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', self._key_schema)
value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)

if value is not None:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
else:
raise ValueSerializerError("Avro schema required for values")

if key is not None:
if key_schema:
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
else:
raise KeySerializerError("Avro schema required for key")

super(AvroProducer, self).produce(topic, value, key, **kwargs)


class AvroConsumer(Consumer):
"""
Kafka Consumer client which does avro schema decoding of messages.
Handles message deserialization.

Constructor takes below parameters

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param schema reader_key_schema: a reader schema for the message key
:param schema reader_value_schema: a reader schema for the message value
:raises ValueError: For invalid configurations
"""

def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):

sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)

def poll(self, timeout=None):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles message
deserialization using avro schema

:param float timeout: Poll timeout in seconds (default: indefinite)
:returns: message object with deserialized key and value as dict objects
:rtype: Message
"""
if timeout is None:
timeout = -1
message = super(AvroConsumer, self).poll(timeout)
if message is None:
return None

if not message.error():
try:
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
except SerializerError as e:
raise SerializerError("Message deserialization failed for message at {} [{}] offset {}: {}".format(
message.topic(),
message.partition(),
message.offset(),
e))
return message
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import warnings

from .avro_consumer import AvroConsumer
from .avro_producer import AvroProducer
from .cached_schema_registry_client import CachedSchemaRegistryClient
from .error import ClientError
from .load import load, loads

__all__ = ['AvroConsumer', 'AvroProducer',
'load', 'loads',
'ClientError', 'CachedSchemaRegistryClient']

# TODO: Add reference to avro example
warnings.warn(
"Package confluent_kafka.avro has been deprecated."
"This package will be removed in a future version",
category=DeprecationWarning, stacklevel=2)
87 changes: 87 additions & 0 deletions confluent_kafka/avro/avro_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from .cached_schema_registry_client import CachedSchemaRegistryClient
from .serializer import SerializerError
from confluent_kafka import Consumer
from confluent_kafka.serialization import AvroSerializer


class AvroConsumer(Consumer):
"""
Kafka Consumer client which does avro schema decoding of messages.
Handles message deserialization.

Constructor takes below parameters

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param Schema reader_key_schema: a reader schema for the message key
:param Schema reader_value_schema: a reader schema for the message value
:raises ValueError: For invalid configurations
"""
__slots__ = ['_key_serializer', '_value_serializer']

def __init__(self, config, schema_registry=None,
reader_key_schema=None, reader_value_schema=None):

sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

self._key_serializer = AvroSerializer(schema_registry, reader_schema=reader_key_schema)
self._value_serializer = AvroSerializer(schema_registry, reader_schema=reader_value_schema)

super(AvroConsumer, self).__init__(ap_conf)

def poll(self, timeout=-1.0):
"""
This is an overridden method from confluent_kafka.Consumer class. This handles message
deserialization using Avro schema

:param float timeout: Poll timeout in seconds (default: indefinite)
:returns: message object with deserialized key and value as dict objects
:rtype: Message
"""

message = super(AvroConsumer, self).poll(timeout)
if message is None:
return None

if not message.error():
try:
message.set_value(self._value_serializer.deserialize(message.value(), None))
message.set_key(self._key_serializer.deserialize(message.key(), None))
except SerializerError as e:
raise SerializerError("Message deserialization failed for message at {} [{}] "
"offset {}: {}".format(message.topic(),
message.partition(),
message.offset(), e))
return message
Loading