Skip to content

Commit 09e55a6

Browse files
committed
Java compatible serdes
1 parent 143c37b commit 09e55a6

22 files changed

+1415
-52
lines changed

.appveyor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ environment:
22
global:
33
LIBRDKAFKA_NUGET_VERSION: 1.3.0
44
CIBW_SKIP: cp33-* cp34-*
5-
CIBW_TEST_REQUIRES: pytest requests avro
5+
CIBW_TEST_REQUIRES: pytest requests avro trivup
66
# SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the
77
# /E:ON and /V:ON options are not enabled in the batch script intepreter
88
# See: http://stackoverflow.com/a/13751649/163740

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ install:
5353
- tools/install-interceptors.sh
5454
- pip install -U pip && pip install virtualenv
5555
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
56-
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
56+
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -r confluent_kafka/requirements.txt -r tests/requirements.txt -r confluent_kafka/avro/requirements.txt; fi
5757
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
5858
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi
5959
- if [[ ! -z $EXTRA_PKGS ]]; then pip install $(echo $EXTRA_PKGS) ; fi
6060

6161
script:
62-
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] ; fi
62+
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[dev] ; fi
6363
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
6464
# Make plugins available for tests
6565
- ldd staging/libs/* || otool -L staging/libs/* || true

confluent_kafka/avro/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from confluent_kafka import Producer, Consumer
77
from confluent_kafka.avro.error import ClientError
8-
from confluent_kafka.avro.load import load, loads # noqa
98
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
109
from confluent_kafka.avro.serializer import (SerializerError, # noqa
1110
KeySerializerError,
@@ -50,7 +49,7 @@ def __init__(self, config, default_key_schema=None,
5049
self._key_schema = default_key_schema
5150
self._value_schema = default_value_schema
5251

53-
def produce(self, **kwargs):
52+
def produce(self, topic, value, **kwargs):
5453
"""
5554
Asynchronously sends message to Kafka by encoding with specified or default avro schema.
5655

confluent_kafka/avro/load.py

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,28 @@
1515
# limitations under the License.
1616
#
1717

18-
import sys
18+
from fastavro.schema import parse_schema, load_schema
1919

20-
from confluent_kafka.avro.error import ClientError
2120

21+
def loads(schema):
22+
"""
23+
Returns an Avro Schema from a string
2224
23-
def loads(schema_str):
25+
:param str schema: Schema string to be parsed
26+
:returns: Parsed Avro Schema
27+
:rtype: Dict
28+
"""
2429
""" Parse a schema given a schema string """
25-
try:
26-
if sys.version_info[0] < 3:
27-
return schema.parse(schema_str)
28-
else:
29-
return schema.Parse(schema_str)
30-
except schema.SchemaParseException as e:
31-
raise ClientError("Schema parse failed: %s" % (str(e)))
30+
return parse_schema(schema)
3231

3332

34-
def load(fp):
35-
""" Parse a schema from a file path """
36-
with open(fp) as f:
37-
return loads(f.read())
38-
39-
40-
# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitly as
41-
# a quick fix
42-
def _hash_func(self):
43-
return hash(str(self))
44-
33+
def load(avsc):
34+
"""
35+
Returns an Avro Schema from a file path.
4536
46-
try:
47-
from avro import schema
48-
49-
schema.RecordSchema.__hash__ = _hash_func
50-
schema.PrimitiveSchema.__hash__ = _hash_func
51-
schema.UnionSchema.__hash__ = _hash_func
52-
53-
except ImportError:
54-
schema = None
37+
:param str avsc: Path to Avro Schema file
38+
:returns: Parsed Schema
39+
:rtype: dict
40+
"""
41+
""" Parse a schema from a file path """
42+
return load_schema(fp)

confluent_kafka/avro/requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fastavro
2+
requests
3+
avro;python_version<"3.0"
4+
avro-python3;python_version>"3.0"

confluent_kafka/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
futures;python_version<"3.2"
2+
enum34;python_version<"3.4"
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
#
5+
# Copyright 2020 Confluent Inc.
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
from confluent_kafka.serialization._serializers import double_deserializer, float_deserializer, \
21+
long_deserializer, int_deserializer, short_deserializer, string_deserializer, \
22+
double_serializer, float_serializer, long_serializer, int_serializer, short_serializer, string_serializer
23+
24+
__all__ = [double_deserializer, float_deserializer,
25+
long_deserializer, int_deserializer, short_deserializer, string_deserializer,
26+
double_serializer, float_serializer,
27+
long_serializer, int_serializer, short_serializer, string_serializer]
28+
29+
30+
"""
31+
Collection of built-in serializers designed to be compatible with org/apache/kafka/common/serialization/Serializer.
32+
33+
**Note** Strings are a bit tricky in Python 2 and require special handling to get right as they are already stored
34+
internally as bytes. How those bytes are stored is contingent upon your source file encoding (PEP-263). This puts the
35+
library at a bit of a disadvantage it is unable to coerce the interpreter to represent strings in the encoding of it's
36+
choosing; UTF-8 in this case. Keeping this limitation in mind we recommend that all Python 2 targeted implementations
37+
set their source encoding to `UTF-8`.
38+
39+
This can be achieved by adding the following header to the application's source files.
40+
41+
```
42+
#!/usr/bin/env python
43+
# -*- coding: utf-8 -*-
44+
....
45+
```
46+
47+
JavaDocs for reference.
48+
Serializers:
49+
50+
DoubleSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/DoubleSerializer.html
51+
FloatSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/FloatSerializer.html
52+
LongSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/LongSerializer.html
53+
IntegerSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/IntegerSerializer.html
54+
ShortSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/ShortSerializer.html
55+
StringSerializer https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/StringSerializer.html
56+
57+
Deserializers:
58+
59+
DoubleSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/DoubleDeserializer.html
60+
FloatDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/FloatDeserializer.html
61+
LongDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/LongDeserializer.html
62+
IntegerDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/IntegerDeserializer.html
63+
ShortDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/ShortDeserializer.html
64+
StringDeserializer https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html
65+
"""
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
#
5+
# Copyright 2020 Confluent Inc.
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
import struct
21+
22+
from sys import version_info as interpreter_version
23+
24+
25+
def _py3_str_serializer(value):
26+
"""
27+
Converts Python String to UTF-8 encoded bytes.
28+
29+
:param str value: string to be encoded
30+
:returns: utf-8 encoded byte array
31+
:rtype: bytes
32+
"""
33+
return value.encode('utf-8')
34+
35+
36+
def _py3_str_deserailizer(data):
37+
"""
38+
Converts UTF-8 encoded bytes to a string.
39+
40+
:param bytes data: UTF-8 encoded string
41+
:returns: decoded string
42+
:rtype: str
43+
"""
44+
return str(data, 'utf-8')
45+
46+
47+
def _py2_str_serializer(value):
48+
"""
49+
Python 2 strings are synonymous with bytes.
50+
51+
Coercing encoding on non-ascii characters here will result in an exception.
52+
Although character encoding can be controlled by setting the source encoding
53+
this has no effect on the behavior of bytes.encode().
54+
55+
See PEP-263 on setting the encoding.
56+
57+
:param str value: string to be encoded
58+
:returns: encoded string
59+
:rtype: bytes
60+
"""
61+
return value
62+
63+
64+
def _py2_str_deserializer(data):
65+
"""
66+
Decodes utf-8 encoded stream.
67+
68+
See PEP-263 on setting the encoding.
69+
70+
:param bytes data: utf-8 encoded string
71+
:returns: decoded string
72+
:rtype: str
73+
"""
74+
return data.decode('utf-8')
75+
76+
77+
if interpreter_version >= (3, 4):
78+
string_serializer = _py3_str_serializer
79+
string_deserializer = _py3_str_deserailizer
80+
else:
81+
string_serializer = _py2_str_serializer
82+
string_deserializer = _py2_str_deserializer
83+
84+
85+
def double_serializer(value):
86+
"""
87+
Convert Python float to C type double bytes.
88+
89+
:param float value: float to be encoded
90+
:returns: C type double bytes
91+
:rtype: bytes
92+
"""
93+
return struct.pack('>d', value)
94+
95+
96+
def double_deserializer(data):
97+
"""
98+
Convert C type double to Python float.
99+
100+
:param bytes data: C type double bytes
101+
:returns: decoded float
102+
:rtype: float
103+
"""
104+
return struct.unpack('>d', data)[0]
105+
106+
107+
def long_serializer(value):
108+
"""
109+
Convert Python integer to C type long long bytes.
110+
111+
:param int value: integer to be encoded
112+
:returns: C type long long bytes
113+
:rtype: bytes
114+
"""
115+
return struct.pack('>q', value)
116+
117+
118+
def long_deserializer(data):
119+
"""
120+
Convert C type long long bytes to Python integer.
121+
122+
:param bytes data: C type long long bytes
123+
:returns: decoded integer
124+
:rtype: integer
125+
"""
126+
return struct.unpack('>q', data)[0]
127+
128+
129+
def float_serializer(value):
130+
"""
131+
Convert a Python float to C type float bytes.
132+
133+
:param float value: float to be encoded
134+
:returns: C type float bytes
135+
:rtype: bytes
136+
"""
137+
return struct.pack('>f', value)
138+
139+
140+
def float_deserializer(data):
141+
"""
142+
Convert C type float bytes to Python float.
143+
144+
:param bytes data: C type float bytes
145+
:returns: decoded float
146+
:rtype: float
147+
"""
148+
return struct.unpack('>f', data)[0]
149+
150+
151+
def int_serializer(value):
152+
"""
153+
Converts Python integer to C type int bytes.
154+
155+
:param int value: integer to be encoded
156+
:returns: C type int bytes
157+
:rtype: bytes
158+
"""
159+
return struct.pack('>i', value)
160+
161+
162+
def int_deserializer(data):
163+
"""
164+
Convert C type int bytes to Python integer.
165+
166+
:param bytes data: C type int bytes
167+
:returns: decoded integer
168+
:rtype: int
169+
"""
170+
return struct.unpack('>i', data)[0]
171+
172+
173+
def short_serializer(value):
174+
"""
175+
Converts Python integer to C type short bytes.
176+
177+
:param int value: integer to be encoded
178+
:returns: C type short bytes
179+
:rtype: bytes
180+
"""
181+
return struct.pack(">h", value)
182+
183+
184+
def short_deserializer(data):
185+
"""
186+
Converts C type short to Python integer.
187+
188+
:param bytes data: C short bytes
189+
:returns: decoded integer
190+
:rtype: int
191+
"""
192+
return struct.unpack(">h", data)[0]

setup.py

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,14 @@
55
from distutils.core import Extension
66
import platform
77

8-
INSTALL_REQUIRES = [
9-
'futures;python_version<"3.2"',
10-
'enum34;python_version<"3.4"',
11-
'requests;python_version<"3.2"'
12-
]
8+
with open('confluent_kafka/requirements.txt') as f:
9+
INSTALL_REQUIRES = f.read().split()
1310

14-
AVRO_REQUIRES = [
15-
'fastavro',
16-
'requests',
17-
'avro;python_version<"3.0"',
18-
'avro-python3;python_version>"3.0"'
19-
]
11+
with open('confluent_kafka/avro/requirements.txt') as f:
12+
AVRO_REQUIRES = f.read().splitlines()
2013

21-
TEST_REQUIRES = [
22-
'pytest==4.6.4;python_version<"3.0"',
23-
'pytest;python_version>="3.0"',
24-
'pytest-timeout',
25-
'flake8'
26-
]
14+
with open('tests/requirements.txt') as f:
15+
TEST_REQUIRES = f.read().splitlines()
2716

2817
# On Un*x the library is linked as -lrdkafka,
2918
# while on windows we need the full librdkafka name.

0 commit comments

Comments
 (0)