avro consumer with explicitly specified read schema#470
Conversation
|
Confluent Inc. (@confluentinc) It looks like Frank Kaufer (@fkaufer) just signed our Contributor License Agreement. 👍 Always at your service, clabot |
|
Looks like they dropped the https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459 |
Wow, that's ugly, even more as they didn't adopt the py2 interfaces accordingly https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423 But it seems the change in (slow) Avro py3 is not new, so I am wondering why it's now falling back to slow-avro instead of using fastavro. |
|
Ryan P (@rnpridgeon) seems we are good to go. Can you approve and merge? |
|
Ryan P (@rnpridgeon) anything missing? |
Ryan P (rnpridgeon)
left a comment
There was a problem hiding this comment.
I've suggested a few changes to parameter naming but other than that I'm happy with it.
| :param optional a reader schema for the message value | ||
| """ | ||
| def __init__(self, config, schema_registry=None): | ||
| def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): |
There was a problem hiding this comment.
| def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): | |
| def __init__(self, config, schema_registry=None, key_reader_schema=None, value_reader_schema=None): |
There was a problem hiding this comment.
The word reader actually describes the schema rather than the message fields. I suggest instead naming these parameters key_reader_schema and value_reader_schema.
There was a problem hiding this comment.
I had it first that way (or key_schema_reader considering reader as var index), but this would be inconsistent with default_key_schema and default_value_schema in the constructor:
See
confluent-kafka-python/confluent_kafka/avro/__init__.py
Lines 29 to 30 in 443177e
Furthermore I consider key_schema/value_schema compounds naming an attribute/member of the object/namespace reader
This is also more consistent with fastavro (writer_schema, reader_schema)
https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader
and the REST Proxy (key_schema and value_schema)
https://docs.confluent.io/current/kafka-rest/docs/api.html#post--topics-(string-topic_name)
|
Ryan P (@rnpridgeon) ad-hoc can't see why the build is failing now except that the ldd check fails now https://travis-ci.org/confluentinc/confluent-kafka-python/jobs/446822558#L2906-L2920) rebase (86a6160) didn't help |
|
Ryan P (@rnpridgeon) I have closed and reopened the PR to retrigger the CI now that the dependencies seem to be fixed. With success. So, can you merge? |
|
Thanks Frank Kaufer (@fkaufer), I think I'm happy with it the way it is now so I have asked Magnus Edenhill (@edenhill) to give it a quick secondary review. |
|
Magnus Edenhill (@edenhill) Ryan P (@rnpridgeon) gents, many thanks for your reviews, I made the changes as requested, all checks have passed. |
Magnus Edenhill (edenhill)
left a comment
There was a problem hiding this comment.
LGTM.
Ryan, you have the final word on this.
|
Ack, thank you for the secondary review Magnus Edenhill (@edenhill) |
|
Thanks Frank Kaufer (@fkaufer) for your contribution! |
| schema_dict = schema.to_json() | ||
| schemaless_reader(payload, schema_dict) | ||
| writer_schema = writer_schema_obj.to_json() | ||
| reader_schema = reader_schema_obj.to_json() |
There was a problem hiding this comment.
Sorry I'm a bit late to the party here.
Frank Kaufer (@fkaufer) , correct me if I'm wrong, but my understanding is that self.reader_value_schema can be null, and so reader_schema_obj could be null as well. Wouldn't that line cause an NPE?
Also I wonder if the next line should not be schemaless_reader(payload, writer_schema, reader_schema) to be consistent with the code executed by the lambda this function returns.
There was a problem hiding this comment.
The next line is a sanity check as alluded to in the comment below it. This is in place to catch edges which fastavro did not covet at the time the serializer was written. It's unclear if this is still actually an issue or not but it only happens upon instantiating the writer.
You won't get an NPE as the None type is an actual null but you will get an attribute error which would cause the serializer to fall back to slow avro as we call it. If you have already done so please feel free to open an issue which I will address tomorrow.
There was a problem hiding this comment.
Ryan P (@rnpridgeon) , thanks this is helpful. I had already opened an issue for a related issue: #518 I think both can be addressed at the same time.
I actually tried to fix it my self, but couldn't get the test to run. I would need to spend more time trying to set up my development environment.
Allow to configure reader schema explicitly in Avro Consumer to take advantage of Avro's back-/forward compatibility features by separation of writer and reader schema. Before the change, the reader schema was (implicitly) hard-coded to the writer schema (of the respective message), now the consumer can pin a reader schema with the fastavro deserializer doing back or forward schema migration to the reader schema.