Skip to content

Commit fef1440

Browse files
committed
Generic Serialzation API, Avro Refactor, Kafka builtin serializer
1 parent fefd89f commit fef1440

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+3783
-877
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,7 @@ dl-*
2222
.pytest_cache
2323
staging
2424
tests/docker/conf/tls/*
25-
.idea
2625
.python-version
26+
.DS_Store
27+
.idea
28+
tmp-KafkaCluster

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ install:
5757
- tools/install-interceptors.sh
5858
- pip install -U pip && pip install virtualenv
5959
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
60-
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 trivup; fi
60+
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -r confluent_kafka/requirements.txt -r tests/requirements.txt -r confluent_kafka/avro/requirements.txt; fi
6161
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
6262
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi
6363
- if [[ ! -z $EXTRA_PKGS ]]; then pip install $(echo $EXTRA_PKGS) ; fi
6464

6565
script:
66-
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] ; fi
66+
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[dev] ; fi
6767
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
6868
# Make plugins available for tests
6969
- ldd staging/libs/* || otool -L staging/libs/* || true

confluent_kafka/__init__.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest']
2-
from .cimpl import (Consumer, # noqa
3-
KafkaError,
1+
from .producer import Producer
2+
from .consumer import Consumer
3+
from .serialization import AvroSerializer, DoubleSerializer, FloatSerializer, \
4+
IntegerSerializer, LongSerializer, ShortSerializer, StringSerializer
5+
from .schema_registry import SchemaRegistryClient, ClientError, Schema
6+
7+
8+
from .cimpl import (KafkaError, # noqa
49
KafkaException,
510
Message,
6-
Producer,
711
TopicPartition,
812
libversion,
913
version,
@@ -15,6 +19,12 @@
1519
OFFSET_STORED,
1620
OFFSET_INVALID)
1721

22+
__all__ = ['cimpl', 'admin', 'avro', 'kafkatest', 'Producer', 'Consumer',
23+
'SchemaRegistryClient', 'ClientError', 'Schema',
24+
'AvroSerializer', 'DoubleSerializer', 'FloatSerializer',
25+
'IntegerSerializer', 'LongSerializer', 'ShortSerializer',
26+
'StringSerializer']
27+
1828
__version__ = version()[0]
1929

2030

confluent_kafka/avro/__init__.py

Lines changed: 155 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,76 @@
1-
"""
2-
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
3-
4-
"""
5-
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import warnings
20+
21+
from .cached_schema_registry_client import CachedSchemaRegistryClient
22+
from .error import ClientError
23+
from .load import load, loads
624
from confluent_kafka import Producer, Consumer
7-
from confluent_kafka.avro.error import ClientError
8-
from confluent_kafka.avro.load import load, loads # noqa
9-
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
10-
from confluent_kafka.avro.serializer import (SerializerError, # noqa
11-
KeySerializerError,
12-
ValueSerializerError)
13-
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
25+
from confluent_kafka.serialization import AvroSerializer, \
26+
MessageField, SerializationContext, \
27+
SerializerError, ValueSerializerError, KeySerializerError
1428

29+
from ..serialization.avro import ContextStringIO
1530

16-
class AvroProducer(Producer):
17-
"""
18-
Kafka Producer client which does avro schema encoding to messages.
19-
Handles schema registration, Message serialization.
31+
__all__ = ['AvroConsumer', 'AvroProducer',
32+
'load', 'loads',
33+
'ClientError', 'CachedSchemaRegistryClient']
2034

21-
Constructor takes below parameters.
2235

23-
:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
24-
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
25-
:param str default_key_schema: Optional default avro schema for key
26-
:param str default_value_schema: Optional default avro schema for value
27-
"""
36+
warnings.warn(
37+
"Package confluent_kafka.avro and all of it's modules have been deprecated."
38+
" This package, and it's modules will be removed in a future release."
39+
" An example reference of the new serialization API can be found at examples/avro-cli.py.",
40+
category=DeprecationWarning, stacklevel=2)
2841

29-
def __init__(self, config, default_key_schema=None,
30-
default_value_schema=None, schema_registry=None):
31-
32-
sr_conf = {key.replace("schema.registry.", ""): value
33-
for key, value in config.items() if key.startswith("schema.registry")}
3442

35-
if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
36-
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
37-
sr_conf['sasl.username'] = config.get('sasl.username', '')
38-
sr_conf['sasl.password'] = config.get('sasl.password', '')
39-
sr_conf['auto.register.schemas'] = config.get('auto.register.schemas', True)
40-
41-
ap_conf = {key: value
42-
for key, value in config.items() if not key.startswith("schema.registry")}
43-
44-
if schema_registry is None:
45-
schema_registry = CachedSchemaRegistryClient(sr_conf)
46-
elif sr_conf.get("url", None) is not None:
47-
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
48-
49-
super(AvroProducer, self).__init__(ap_conf)
50-
self._serializer = MessageSerializer(schema_registry)
51-
self._key_schema = default_key_schema
52-
self._value_schema = default_value_schema
43+
class _AvroSerializerShim(AvroSerializer):
44+
"""
45+
AvroSerializerWrapper allowing for schema objects to appended to the serialization context
46+
"""
5347

54-
def produce(self, **kwargs):
48+
def serialize(self, schema, datum, ctx):
5549
"""
56-
Asynchronously sends message to Kafka by encoding with specified or default avro schema.
50+
Encode datum to Avro binary format
5751
58-
:param str topic: topic name
59-
:param object value: An object to serialize
60-
:param str value_schema: Avro schema for value
61-
:param object key: An object to serialize
62-
:param str key_schema: Avro schema for key
52+
:param Schema schema: optional schema override
53+
:param object datum: Avro object to encode
54+
:param SerializationContext ctx:
6355
64-
Plus any other parameters accepted by confluent_kafka.Producer.produce
56+
:raises: SerializerError
6557
66-
:raises SerializerError: On serialization failure
67-
:raises BufferError: If producer queue is full.
68-
:raises KafkaException: For other produce failures.
58+
:returns: encoded Record|Primitive
59+
:rtype: bytes
6960
"""
70-
# get schemas from kwargs if defined
71-
key_schema = kwargs.pop('key_schema', self._key_schema)
72-
value_schema = kwargs.pop('value_schema', self._value_schema)
73-
topic = kwargs.pop('topic', None)
74-
if not topic:
75-
raise ClientError("Topic name not specified.")
76-
value = kwargs.pop('value', None)
77-
key = kwargs.pop('key', None)
61+
if datum is None:
62+
return None
7863

79-
if value is not None:
80-
if value_schema:
81-
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
82-
else:
83-
raise ValueSerializerError("Avro schema required for values")
64+
if not schema:
65+
schema = self.schema
8466

85-
if key is not None:
86-
if key_schema:
87-
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
88-
else:
89-
raise KeySerializerError("Avro schema required for key")
67+
if self.converter:
68+
datum = self.converter.to_dict(datum)
9069

91-
super(AvroProducer, self).produce(topic, value, key, **kwargs)
70+
with ContextStringIO() as fo:
71+
self._encode(fo, schema, datum, ctx)
72+
73+
return fo.getvalue()
9274

9375

9476
class AvroConsumer(Consumer):
@@ -100,12 +82,14 @@ class AvroConsumer(Consumer):
10082
10183
:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
10284
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
103-
:param schema reader_key_schema: a reader schema for the message key
104-
:param schema reader_value_schema: a reader schema for the message value
85+
:param Schema reader_key_schema: a reader schema for the message key
86+
:param Schema reader_value_schema: a reader schema for the message value
10587
:raises ValueError: For invalid configurations
10688
"""
89+
__slots__ = ['_key_serializer', '_value_serializer']
10790

108-
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
91+
def __init__(self, config, schema_registry=None,
92+
reader_key_schema=None, reader_value_schema=None):
10993

11094
sr_conf = {key.replace("schema.registry.", ""): value
11195
for key, value in config.items() if key.startswith("schema.registry")}
@@ -123,36 +107,112 @@ def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_
123107
elif sr_conf.get("url", None) is not None:
124108
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
125109

110+
self._key_serializer = AvroSerializer(schema_registry, reader_schema=reader_key_schema)
111+
self._value_serializer = AvroSerializer(schema_registry, reader_schema=reader_value_schema)
112+
126113
super(AvroConsumer, self).__init__(ap_conf)
127-
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)
128114

129-
def poll(self, timeout=None):
115+
def poll(self, timeout=-1.0):
130116
"""
131-
This is an overriden method from confluent_kafka.Consumer class. This handles message
132-
deserialization using avro schema
117+
This is an overridden method from confluent_kafka.Consumer class. This handles message
118+
deserialization using Avro schema
133119
134120
:param float timeout: Poll timeout in seconds (default: indefinite)
135121
:returns: message object with deserialized key and value as dict objects
136122
:rtype: Message
137123
"""
138-
if timeout is None:
139-
timeout = -1
124+
140125
message = super(AvroConsumer, self).poll(timeout)
141126
if message is None:
142127
return None
143128

144129
if not message.error():
145130
try:
146-
if message.value() is not None:
147-
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
148-
message.set_value(decoded_value)
149-
if message.key() is not None:
150-
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
151-
message.set_key(decoded_key)
131+
message.set_value(self._value_serializer.deserialize(message.value(), None))
132+
message.set_key(self._key_serializer.deserialize(message.key(), None))
152133
except SerializerError as e:
153-
raise SerializerError("Message deserialization failed for message at {} [{}] offset {}: {}".format(
154-
message.topic(),
155-
message.partition(),
156-
message.offset(),
157-
e))
134+
raise SerializerError("Message deserialization failed for message at {} [{}] "
135+
"offset {}: {}".format(message.topic(),
136+
message.partition(),
137+
message.offset(), e))
158138
return message
139+
140+
141+
class AvroProducer(Producer):
142+
"""
143+
Kafka Producer client which does avro schema encoding to messages.
144+
Handles schema registration, Message serialization.
145+
146+
Constructor takes below parameters.
147+
148+
:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
149+
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
150+
:param Schema default_key_schema: Optional default avro schema for key
151+
:param Schema default_value_schema: Optional default avro schema for value
152+
:param CachedSchemaRegistryClient schema_registry: Optional CachedSchemaRegistryClient instance
153+
"""
154+
__slots__ = ['_key_serializer', '_value_serializer']
155+
156+
def __init__(self, config,
157+
default_key_schema=None, default_value_schema=None,
158+
schema_registry=None):
159+
160+
sr_conf = {key.replace("schema.registry.", ""): value
161+
for key, value in config.items() if key.startswith("schema.registry")}
162+
163+
if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
164+
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
165+
sr_conf['sasl.username'] = config.get('sasl.username', '')
166+
sr_conf['sasl.password'] = config.get('sasl.password', '')
167+
168+
ap_conf = {key: value
169+
for key, value in config.items() if not key.startswith("schema.registry")}
170+
171+
if schema_registry is None:
172+
schema_registry = CachedSchemaRegistryClient(sr_conf)
173+
elif sr_conf.get("url", None) is not None:
174+
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
175+
176+
self._key_serializer = _AvroSerializerShim(schema_registry,
177+
schema=default_key_schema)
178+
self._value_serializer = _AvroSerializerShim(schema_registry,
179+
schema=default_value_schema)
180+
181+
super(AvroProducer, self).__init__(ap_conf)
182+
183+
def produce(self, topic, **kwargs):
184+
"""
185+
Asynchronously sends message to Kafka by encoding with specified or default avro schema.
186+
187+
:param str topic: topic name
188+
:param object value: An object to serialize
189+
:param Schema value_schema: Avro schema for value
190+
:param object key: An object to serialize
191+
:param Schema key_schema: Avro schema for key
192+
193+
Plus any other parameters accepted by confluent_kafka.Producer.produce
194+
195+
:raises SerializerError: On serialization failure
196+
:raises BufferError: If producer queue is full.
197+
:raises KafkaException: For other produce failures.
198+
"""
199+
200+
key = kwargs.pop('key', None)
201+
value = kwargs.pop('value', None)
202+
203+
key_schema = kwargs.pop('key_schema', None)
204+
value_schema = kwargs.pop('value_schema', None)
205+
206+
ctx = SerializationContext(topic, MessageField.KEY)
207+
try:
208+
key = self._key_serializer.serialize(key_schema, key, ctx)
209+
except SerializerError as e:
210+
raise KeySerializerError(e.message)
211+
212+
ctx.field = MessageField.VALUE
213+
try:
214+
value = self._value_serializer.serialize(value_schema, value, ctx)
215+
except SerializerError as e:
216+
raise ValueSerializerError(e.message)
217+
218+
super(AvroProducer, self).produce(topic, value, key, **kwargs)

0 commit comments

Comments
 (0)