-
Notifications
You must be signed in to change notification settings - Fork 915
avro consumer with explicitly specified read schema #470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@confluentinc It looks like @fkaufer just signed our Contributor License Agreement. 👍 Always at your service, clabot |
Looks like they dropped the https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459 |
Wow, that's ugly, even more as they didn't adopt the py2 interfaces accordingly https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423 But it seems the change in (slow) Avro py3 is not new, so I am wondering why it's now falling back to slow-avro instead of using fastavro. |
@rnpridgeon seems we are good to go. Can you approve and merge? |
@rnpridgeon anything missing? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've suggested a few changes to parameter naming but other than that I'm happy with it.
""" | ||
def __init__(self, config, schema_registry=None): | ||
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): | |
def __init__(self, config, schema_registry=None, key_reader_schema=None, value_reader_schema=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word reader actually describes the schema rather than the message fields. I suggest instead naming these parameters key_reader_schema and value_reader_schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it first that way (or key_schema_reader
considering reader as var index), but this would be inconsistent with default_key_schema
and default_value_schema
in the constructor:
See
confluent-kafka-python/confluent_kafka/avro/__init__.py
Lines 29 to 30 in 443177e
def __init__(self, config, default_key_schema=None, | |
default_value_schema=None, schema_registry=None): |
Furthermore I consider key_schema
/value_schema
compounds naming an attribute/member of the object/namespace reader
This is also more consistent with fastavro (writer_schema
, reader_schema
)
https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader
and the REST Proxy (key_schema
and value_schema
)
https://docs.confluent.io/current/kafka-rest/docs/api.html#post--topics-(string-topic_name)
@rnpridgeon ad-hoc can't see why the build is failing now except that the ldd check fails now https://travis-ci.org/confluentinc/confluent-kafka-python/jobs/446822558#L2906-L2920) rebase (86a6160) didn't help |
@rnpridgeon I have closed and reopened the PR to retrigger the CI now that the dependencies seem to be fixed. With success. So, can you merge? |
@edenhill @rnpridgeon gents, many thanks for your reviews, I made the changes as requested, all checks have passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Ryan, you have the final word on this.
Ack, thank you for the secondary review @edenhill |
Thanks @fkaufer for your contribution! |
schema_dict = schema.to_json() | ||
schemaless_reader(payload, schema_dict) | ||
writer_schema = writer_schema_obj.to_json() | ||
reader_schema = reader_schema_obj.to_json() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm a bit late to the party here.
@fkaufer , correct me if I'm wrong, but my understanding is that self.reader_value_schema
can be null, and so reader_schema_obj
could be null as well. Wouldn't that line cause an NPE?
Also I wonder if the next line should not be schemaless_reader(payload, writer_schema, reader_schema)
to be consistent with the code executed by the lambda this function returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The next line is a sanity check as alluded to in the comment below it. This is in place to catch edges which fastavro did not covet at the time the serializer was written. It's unclear if this is still actually an issue or not but it only happens upon instantiating the writer.
You won't get an NPE as the None
type is an actual null but you will get an attribute error which would cause the serializer to fall back to slow avro
as we call it. If you have already done so please feel free to open an issue which I will address tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rnpridgeon , thanks this is helpful. I had already opened an issue for a related issue: #518 I think both can be addressed at the same time.
I actually tried to fix it my self, but couldn't get the test to run. I would need to spend more time trying to set up my development environment.
Allow to configure reader schema explicitly in Avro Consumer to take advantage of Avro's back-/forward compatibility features by separation of writer and reader schema. Before the change, the reader schema was (implicitly) hard-coded to the writer schema (of the respective message), now the consumer can pin a reader schema with the fastavro deserializer doing back or forward schema migration to the reader schema.