-
Notifications
You must be signed in to change notification settings - Fork 917
Avro consumer return schema #453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d4e6118
6b6dca8
c0cb293
c874c1c
2481ead
c339e88
68377a0
d135353
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,3 +20,6 @@ wheelhouse | |
dl-* | ||
*.whl | ||
.pytest_cache | ||
.idea/ | ||
.venv/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,42 @@ def __exit__(self, *args): | |
return False | ||
|
||
|
||
class HasSchemaMixin: | ||
""" | ||
A mixing for decoded Avro record to make able to add schema attribute | ||
""" | ||
def schema(self): | ||
""" | ||
:return: Avro schema for used to decode this entity | ||
:rtype: avro.schema.Schema | ||
""" | ||
return self._schema | ||
|
||
|
||
def _wrap(value, schema): | ||
""" | ||
Wraps a value into subclass with HasSchemaMixin | ||
:param value: a decoded value | ||
:param schema: corresponding Avro schema used to decode value | ||
:return: An instance of a dynamically created class with schema fullname | ||
""" | ||
if hasattr(schema, 'fullname'): | ||
name = schema.fullname | ||
elif hasattr(schema, 'namespace'): | ||
name = "{namespace}.{name}".format(namespace=schema.namespace, | ||
name=schema.name) | ||
elif hasattr(schema, 'name'): | ||
name = schema.name | ||
else: | ||
name = schema.type | ||
|
||
new_class = type(str(name), (value.__class__, HasSchemaMixin), {}) | ||
|
||
wrapped = new_class(value) | ||
wrapped._schema = schema | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the unlikely, but possible, event that existing user's class has a ._schema field, this will silently overwrite it, and is as such a breaking change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but I do not have a better solution. An option (though a breaking change) would be returning a tuple |
||
return wrapped | ||
|
||
|
||
class MessageSerializer(object): | ||
""" | ||
A helper class that can serialize and deserialize messages | ||
|
@@ -213,4 +249,7 @@ def decode_message(self, message): | |
if magic != MAGIC_BYTE: | ||
raise SerializerError("message does not start with magic byte") | ||
decoder_func = self._get_decoder_func(schema_id, payload) | ||
return decoder_func(payload) | ||
return _wrap( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @saabeilin Wouldn't it be better that the |
||
decoder_func(payload), | ||
self.registry_client.get_by_id(schema_id) | ||
) |
Uh oh!
There was an error while loading. Please reload this page.