-
Notifications
You must be signed in to change notification settings - Fork 915
avro consumer with explicitly specified read schema #470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bc00e6e
e803631
432a936
74c7ad9
dc841a8
10637a9
1f170fb
2acbf4d
e902f2f
cb3fca4
529218f
65c7ca2
33d5980
3b69697
70453e8
86a6160
fcf903d
c4ff376
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,10 +68,12 @@ class MessageSerializer(object): | |
All decode_* methods expect a buffer received from kafka. | ||
""" | ||
|
||
def __init__(self, registry_client): | ||
def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=None): | ||
self.registry_client = registry_client | ||
self.id_to_decoder_func = {} | ||
self.id_to_writers = {} | ||
self.reader_key_schema = reader_key_schema | ||
self.reader_value_schema = reader_value_schema | ||
|
||
''' | ||
|
||
|
@@ -151,33 +153,38 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False): | |
return outf.getvalue() | ||
|
||
# Decoder support | ||
def _get_decoder_func(self, schema_id, payload): | ||
def _get_decoder_func(self, schema_id, payload, is_key=False): | ||
if schema_id in self.id_to_decoder_func: | ||
return self.id_to_decoder_func[schema_id] | ||
|
||
# fetch from schema reg | ||
# fetch writer schema from schema reg | ||
try: | ||
schema = self.registry_client.get_by_id(schema_id) | ||
writer_schema_obj = self.registry_client.get_by_id(schema_id) | ||
except ClientError as e: | ||
raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e))) | ||
|
||
if schema is None: | ||
if writer_schema_obj is None: | ||
raise SerializerError("unable to fetch schema with id %d" % (schema_id)) | ||
|
||
curr_pos = payload.tell() | ||
|
||
reader_schema_obj = self.reader_key_schema if is_key else self.reader_value_schema | ||
|
||
if HAS_FAST: | ||
# try to use fast avro | ||
try: | ||
schema_dict = schema.to_json() | ||
schemaless_reader(payload, schema_dict) | ||
writer_schema = writer_schema_obj.to_json() | ||
reader_schema = reader_schema_obj.to_json() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I'm a bit late to the party here. @fkaufer , correct me if I'm wrong, but my understanding is that Also I wonder if the next line should not be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The next line is a sanity check as alluded to in the comment below it. This is in place to catch edges which fastavro did not covet at the time the serializer was written. It's unclear if this is still actually an issue or not but it only happens upon instantiating the writer. You won't get an NPE as the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rnpridgeon , thanks this is helpful. I had already opened an issue for a related issue: #518 I think both can be addressed at the same time. I actually tried to fix it my self, but couldn't get the test to run. I would need to spend more time trying to set up my development environment. |
||
schemaless_reader(payload, writer_schema) | ||
|
||
# If we reach this point, this means we have fastavro and it can | ||
# do this deserialization. Rewind since this method just determines | ||
# the reader function and we need to deserialize again along the | ||
# normal path. | ||
payload.seek(curr_pos) | ||
|
||
self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict) | ||
self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader( | ||
p, writer_schema, reader_schema) | ||
return self.id_to_decoder_func[schema_id] | ||
except Exception: | ||
# Fast avro failed, fall thru to standard avro below. | ||
|
@@ -186,7 +193,13 @@ def _get_decoder_func(self, schema_id, payload): | |
# here means we should just delegate to slow avro | ||
# rewind | ||
payload.seek(curr_pos) | ||
avro_reader = avro.io.DatumReader(schema) | ||
# Avro DatumReader py2/py3 inconsistency, hence no param keywords | ||
fkaufer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# should be revisited later | ||
# https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459 | ||
# https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423 | ||
# def __init__(self, writers_schema=None, readers_schema=None) | ||
# def __init__(self, writer_schema=None, reader_schema=None) | ||
avro_reader = avro.io.DatumReader(writer_schema_obj, reader_schema_obj) | ||
|
||
def decoder(p): | ||
bin_decoder = avro.io.BinaryDecoder(p) | ||
|
@@ -195,7 +208,7 @@ def decoder(p): | |
self.id_to_decoder_func[schema_id] = decoder | ||
return self.id_to_decoder_func[schema_id] | ||
|
||
def decode_message(self, message): | ||
def decode_message(self, message, is_key=False): | ||
""" | ||
Decode a message from kafka that has been encoded for use with | ||
the schema registry. | ||
|
@@ -212,5 +225,5 @@ def decode_message(self, message): | |
magic, schema_id = struct.unpack('>bI', payload.read(5)) | ||
if magic != MAGIC_BYTE: | ||
raise SerializerError("message does not start with magic byte") | ||
decoder_func = self._get_decoder_func(schema_id, payload) | ||
decoder_func = self._get_decoder_func(schema_id, payload, is_key) | ||
return decoder_func(payload) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
{ | ||
"type": "record", | ||
"name": "UserKey", | ||
"aliases": ["User"], | ||
"fields": [ | ||
{"name": "name", "type": "string"} | ||
] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{ | ||
"type": "record", | ||
"name": "User", | ||
"aliases": ["UserKey"], | ||
"fields": [ | ||
{"name": "name", "type": "string"}, | ||
{"name": "favorite_number", "type": ["null", "int"], "default": null}, | ||
{"name": "favorite_color", "type": ["null", "string"], "default": null} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word reader actually describes the schema rather than the message fields. I suggest instead naming these parameters key_reader_schema and value_reader_schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it first that way (or
key_schema_reader
considering reader as var index), but this would be inconsistent withdefault_key_schema
anddefault_value_schema
in the constructor:See
confluent-kafka-python/confluent_kafka/avro/__init__.py
Lines 29 to 30 in 443177e
Furthermore I consider
key_schema
/value_schema
compounds naming an attribute/member of the object/namespacereader
This is also more consistent with fastavro (
writer_schema
,reader_schema
)https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader
and the REST Proxy (
key_schema
andvalue_schema
)https://docs.confluent.io/current/kafka-rest/docs/api.html#post--topics-(string-topic_name)