Skip to content

Avro consumer return schema #453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ wheelhouse
dl-*
*.whl
.pytest_cache
.idea/
.venv/

41 changes: 40 additions & 1 deletion confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,42 @@ def __exit__(self, *args):
return False


class HasSchemaMixin:
"""
A mixing for decoded Avro record to make able to add schema attribute
"""
def schema(self):
"""
:return: Avro schema for used to decode this entity
:rtype: avro.schema.Schema
"""
return self._schema


def _wrap(value, schema):
"""
Wraps a value into subclass with HasSchemaMixin
:param value: a decoded value
:param schema: corresponding Avro schema used to decode value
:return: An instance of a dynamically created class with schema fullname
"""
if hasattr(schema, 'fullname'):
name = schema.fullname
elif hasattr(schema, 'namespace'):
name = "{namespace}.{name}".format(namespace=schema.namespace,
name=schema.name)
elif hasattr(schema, 'name'):
name = schema.name
else:
name = schema.type

new_class = type(str(name), (value.__class__, HasSchemaMixin), {})

wrapped = new_class(value)
wrapped._schema = schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the unlikely, but possible, event that existing user's class has a ._schema field, this will silently overwrite it, and is as such a breaking change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I do not have a better solution. An option (though a breaking change) would be returning a tuple (value, schema) instead, does it make sense?

return wrapped


class MessageSerializer(object):
"""
A helper class that can serialize and deserialize messages
Expand Down Expand Up @@ -213,4 +249,7 @@ def decode_message(self, message):
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
decoder_func = self._get_decoder_func(schema_id, payload)
return decoder_func(payload)
return _wrap(
Copy link
Contributor

@soxofaan soxofaan Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saabeilin Wouldn't it be better that the _wrap call is moved inside the decoder closure of _get_decoder_func?
Then you don't have to do the self.registry_client.get_by_id(schema_id) calls on each record because the schema is already available at construction time of the decoder function inside _get_decoder_func.

decoder_func(payload),
self.registry_client.get_by_id(schema_id)
)
18 changes: 15 additions & 3 deletions tests/avro/test_message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
#

import struct

import unittest

from confluent_kafka import avro
from confluent_kafka.avro.serializer.message_serializer import (
MessageSerializer, _wrap, HasSchemaMixin
)
from tests.avro import data_gen
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient
from confluent_kafka import avro


class TestMessageSerializer(unittest.TestCase):
Expand Down Expand Up @@ -82,3 +83,14 @@ def test_decode_none(self):

def hash_func(self):
return hash(str(self))

def test_schema_mixin_wrapper(self):
schema = avro.loads(data_gen.BASIC_SCHEMA)
for base_class in (int, float, dict, list):
val = base_class()
wrapped = _wrap(val, schema)
assert val == wrapped
assert isinstance(wrapped, base_class)
assert isinstance(wrapped, HasSchemaMixin)
assert wrapped.schema() is schema
assert wrapped.__class__.__name__ == 'python.test.basic.basic'
7 changes: 6 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = flake8,py27,py34,py35,py36
envlist = flake8,py27,py34,py35,py36,py37

[testenv]
setenv =
Expand Down Expand Up @@ -43,6 +43,11 @@ deps =
{[base]deps}
avro-python3

[testenv:py37]
deps =
{[base]deps}
avro-python3

[testenv:flake8]
deps = flake8
commands = flake8
Expand Down