Skip to content

Java compatible serdes #768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ environment:
global:
LIBRDKAFKA_NUGET_VERSION: 1.3.0
CIBW_SKIP: cp33-* cp34-*
CIBW_TEST_REQUIRES: pytest requests avro
CIBW_TEST_REQUIRES: pytest requests avro trivup
# SDK v7.0 MSVC Express 2008's SetEnv.cmd script will fail if the
# /E:ON and /V:ON options are not enabled in the batch script intepreter
# See: http://stackoverflow.com/a/13751649/163740
Expand Down
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ install:
- tools/install-interceptors.sh
- pip install -U pip && pip install virtualenv
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -r confluent_kafka/requirements.txt -r tests/requirements.txt -r confluent_kafka/avro/requirements.txt; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel --force-reinstall; fi
- if [[ ! -z $EXTRA_PKGS ]]; then pip install $(echo $EXTRA_PKGS) ; fi

script:
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro] ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[dev] ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then flake8 ; fi
# Make plugins available for tests
- ldd staging/libs/* || otool -L staging/libs/* || true
Expand Down
3 changes: 1 addition & 2 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from confluent_kafka import Producer, Consumer
from confluent_kafka.avro.error import ClientError
from confluent_kafka.avro.load import load, loads # noqa
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import (SerializerError, # noqa
KeySerializerError,
Expand Down Expand Up @@ -50,7 +49,7 @@ def __init__(self, config, default_key_schema=None,
self._key_schema = default_key_schema
self._value_schema = default_value_schema

def produce(self, **kwargs):
def produce(self, topic, value, **kwargs):
"""
Asynchronously sends message to Kafka by encoding with specified or default avro schema.

Expand Down
48 changes: 18 additions & 30 deletions confluent_kafka/avro/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,28 @@
# limitations under the License.
#

import sys
from fastavro.schema import parse_schema, load_schema

from confluent_kafka.avro.error import ClientError

def loads(schema):
"""
Returns an Avro Schema from a string

def loads(schema_str):
:param str schema: Schema string to be parsed
:returns: Parsed Avro Schema
:rtype: Dict
"""
""" Parse a schema given a schema string """
try:
if sys.version_info[0] < 3:
return schema.parse(schema_str)
else:
return schema.Parse(schema_str)
except schema.SchemaParseException as e:
raise ClientError("Schema parse failed: %s" % (str(e)))
return parse_schema(schema)


def load(fp):
""" Parse a schema from a file path """
with open(fp) as f:
return loads(f.read())


# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitly as
# a quick fix
def _hash_func(self):
return hash(str(self))

def load(avsc):
"""
Returns an Avro Schema from a file path.

try:
from avro import schema

schema.RecordSchema.__hash__ = _hash_func
schema.PrimitiveSchema.__hash__ = _hash_func
schema.UnionSchema.__hash__ = _hash_func

except ImportError:
schema = None
:param str avsc: Path to Avro Schema file
:returns: Parsed Schema
:rtype: dict
"""
""" Parse a schema from a file path """
return load_schema(fp)
4 changes: 4 additions & 0 deletions confluent_kafka/avro/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fastavro
requests
avro;python_version<"3.0"
avro-python3;python_version>"3.0"
2 changes: 2 additions & 0 deletions confluent_kafka/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
futures;python_version<"3.2"
enum34;python_version<"3.4"
65 changes: 65 additions & 0 deletions confluent_kafka/serialization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from confluent_kafka.serialization._serializers import double_deserializer, float_deserializer, \
long_deserializer, int_deserializer, short_deserializer, string_deserializer, \
double_serializer, float_serializer, long_serializer, int_serializer, short_serializer, string_serializer

__all__ = [double_deserializer, float_deserializer,
long_deserializer, int_deserializer, short_deserializer, string_deserializer,
double_serializer, float_serializer,
long_serializer, int_serializer, short_serializer, string_serializer]


"""
Collection of built-in serializers designed to be compatible with org/apache/kafka/common/serialization/Serializer.

**Note** Strings are a bit tricky in Python 2 and require special handling to get right as they are already stored
internally as bytes. How those bytes are stored is contingent upon your source file encoding (PEP-263). This puts the
library at a bit of a disadvantage it is unable to coerce the interpreter to represent strings in the encoding of it's
choosing; UTF-8 in this case. Keeping this limitation in mind we recommend that all Python 2 targeted implementations
set their source encoding to `UTF-8`.

This can be achieved by adding the following header to the application's source files.

```
#!/usr/bin/env python
# -*- coding: utf-8 -*-
....
```

JavaDocs for reference.
Serializers:

DoubleSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/DoubleSerializer.html
FloatSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/FloatSerializer.html
LongSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/LongSerializer.html
IntegerSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/IntegerSerializer.html
ShortSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/ShortSerializer.html
StringSerializer https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/StringSerializer.html

Deserializers:

DoubleSerializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/DoubleDeserializer.html
FloatDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/FloatDeserializer.html
LongDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/LongDeserializer.html
IntegerDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/IntegerDeserializer.html
ShortDeserializer: https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/ShortDeserializer.html
StringDeserializer https://kafka.apache.org/20/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html
"""
192 changes: 192 additions & 0 deletions confluent_kafka/serialization/_serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import struct

from sys import version_info as interpreter_version


def _py3_str_serializer(value):
"""
Converts Python String to UTF-8 encoded bytes.

:param str value: string to be encoded
:returns: utf-8 encoded byte array
:rtype: bytes
"""
return value.encode('utf-8')


def _py3_str_deserailizer(data):
"""
Converts UTF-8 encoded bytes to a string.

:param bytes data: UTF-8 encoded string
:returns: decoded string
:rtype: str
"""
return str(data, 'utf-8')


def _py2_str_serializer(value):
"""
Python 2 strings are synonymous with bytes.

Coercing encoding on non-ascii characters here will result in an exception.
Although character encoding can be controlled by setting the source encoding
this has no effect on the behavior of bytes.encode().

See PEP-263 on setting the encoding.

:param str value: string to be encoded
:returns: encoded string
:rtype: bytes
"""
return value


def _py2_str_deserializer(data):
"""
Decodes utf-8 encoded stream.

See PEP-263 on setting the encoding.

:param bytes data: utf-8 encoded string
:returns: decoded string
:rtype: str
"""
return data.decode('utf-8')


if interpreter_version >= (3, 4):
string_serializer = _py3_str_serializer
string_deserializer = _py3_str_deserailizer
else:
string_serializer = _py2_str_serializer
string_deserializer = _py2_str_deserializer


def double_serializer(value):
"""
Convert Python float to C type double bytes.

:param float value: float to be encoded
:returns: C type double bytes
:rtype: bytes
"""
return struct.pack('>d', value)


def double_deserializer(data):
"""
Convert C type double to Python float.

:param bytes data: C type double bytes
:returns: decoded float
:rtype: float
"""
return struct.unpack('>d', data)[0]


def long_serializer(value):
"""
Convert Python integer to C type long long bytes.

:param int value: integer to be encoded
:returns: C type long long bytes
:rtype: bytes
"""
return struct.pack('>q', value)


def long_deserializer(data):
"""
Convert C type long long bytes to Python integer.

:param bytes data: C type long long bytes
:returns: decoded integer
:rtype: integer
"""
return struct.unpack('>q', data)[0]


def float_serializer(value):
"""
Convert a Python float to C type float bytes.

:param float value: float to be encoded
:returns: C type float bytes
:rtype: bytes
"""
return struct.pack('>f', value)


def float_deserializer(data):
"""
Convert C type float bytes to Python float.

:param bytes data: C type float bytes
:returns: decoded float
:rtype: float
"""
return struct.unpack('>f', data)[0]


def int_serializer(value):
"""
Converts Python integer to C type int bytes.

:param int value: integer to be encoded
:returns: C type int bytes
:rtype: bytes
"""
return struct.pack('>i', value)


def int_deserializer(data):
"""
Convert C type int bytes to Python integer.

:param bytes data: C type int bytes
:returns: decoded integer
:rtype: int
"""
return struct.unpack('>i', data)[0]


def short_serializer(value):
"""
Converts Python integer to C type short bytes.

:param int value: integer to be encoded
:returns: C type short bytes
:rtype: bytes
"""
return struct.pack(">h", value)


def short_deserializer(data):
"""
Converts C type short to Python integer.

:param bytes data: C short bytes
:returns: decoded integer
:rtype: int
"""
return struct.unpack(">h", data)[0]
23 changes: 6 additions & 17 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,14 @@
from distutils.core import Extension
import platform

INSTALL_REQUIRES = [
'futures;python_version<"3.2"',
'enum34;python_version<"3.4"',
'requests;python_version<"3.2"'
]
with open('confluent_kafka/requirements.txt') as f:
INSTALL_REQUIRES = f.read().split()

AVRO_REQUIRES = [
'fastavro',
'requests',
'avro;python_version<"3.0"',
'avro-python3;python_version>"3.0"'
]
with open('confluent_kafka/avro/requirements.txt') as f:
AVRO_REQUIRES = f.read().splitlines()

TEST_REQUIRES = [
'pytest==4.6.4;python_version<"3.0"',
'pytest;python_version>="3.0"',
'pytest-timeout',
'flake8'
]
with open('tests/requirements.txt') as f:
TEST_REQUIRES = f.read().splitlines()

# On Un*x the library is linked as -lrdkafka,
# while on windows we need the full librdkafka name.
Expand Down
Loading