Skip to content

avro consumer with explicitly specified read schema#470

Merged
Ryan P (rnpridgeon) merged 18 commits into
confluentinc:masterfrom
fkaufer:master
Nov 16, 2018
Merged

avro consumer with explicitly specified read schema#470
Ryan P (rnpridgeon) merged 18 commits into
confluentinc:masterfrom
fkaufer:master

Conversation

@fkaufer
Copy link
Copy Markdown
Contributor

Allow to configure reader schema explicitly in Avro Consumer to take advantage of Avro's back-/forward compatibility features by separation of writer and reader schema. Before the change, the reader schema was (implicitly) hard-coded to the writer schema (of the respective message), now the consumer can pin a reader schema with the fastavro deserializer doing back or forward schema migration to the reader schema.

@ghost
Copy link
Copy Markdown

Confluent Inc. (@confluentinc) It looks like Frank Kaufer (@fkaufer) just signed our Contributor License Agreement. 👍

Always at your service,

clabot

This was referenced Oct 12, 2018
Comment thread confluent_kafka/avro/serializer/message_serializer.py Outdated
Comment thread confluent_kafka/avro/serializer/message_serializer.py Outdated
Comment thread confluent_kafka/avro/__init__.py Outdated
Comment thread tests/avro/read_test_schema.avsc
@rnpridgeon
Copy link
Copy Markdown
Contributor

Looks like they dropped the s in the py3 variant of the avro library. Which I actually like more but that's sort of a cruel thing to do.

https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459

@fkaufer
Copy link
Copy Markdown
Contributor Author

Frank Kaufer (fkaufer) commented Oct 19, 2018

Looks like they dropped the s in the py3 variant of the avro library. Which I actually like more but that's sort of a cruel thing to do.

https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459

Wow, that's ugly, even more as they didn't adopt the py2 interfaces accordingly

https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423

But it seems the change in (slow) Avro py3 is not new, so I am wondering why it's now falling back to slow-avro instead of using fastavro.

@fkaufer
Copy link
Copy Markdown
Contributor Author

Ryan P (@rnpridgeon) seems we are good to go. Can you approve and merge?

@fkaufer
Copy link
Copy Markdown
Contributor Author

Ryan P (@rnpridgeon) anything missing?
And as a reminder, you can close #427 which is obsolete with this PR.

Copy link
Copy Markdown
Contributor

@rnpridgeon Ryan P (rnpridgeon) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've suggested a few changes to parameter naming but other than that I'm happy with it.

:param optional a reader schema for the message value
"""
def __init__(self, config, schema_registry=None):
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
def __init__(self, config, schema_registry=None, key_reader_schema=None, value_reader_schema=None):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word reader actually describes the schema rather than the message fields. I suggest instead naming these parameters key_reader_schema and value_reader_schema.

Copy link
Copy Markdown
Contributor Author

@fkaufer Frank Kaufer (fkaufer) Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it first that way (or key_schema_reader considering reader as var index), but this would be inconsistent with default_key_schema and default_value_schema in the constructor:

See

def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):

Furthermore I consider key_schema/value_schema compounds naming an attribute/member of the object/namespace reader

This is also more consistent with fastavro (writer_schema, reader_schema)
https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader

and the REST Proxy (key_schema and value_schema)
https://docs.confluent.io/current/kafka-rest/docs/api.html#post--topics-(string-topic_name)

Comment thread confluent_kafka/avro/__init__.py
Comment thread confluent_kafka/avro/serializer/message_serializer.py Outdated
Comment thread confluent_kafka/avro/serializer/message_serializer.py
@fkaufer
Copy link
Copy Markdown
Contributor Author

Frank Kaufer (fkaufer) commented Oct 26, 2018

Ryan P (@rnpridgeon) ad-hoc can't see why the build is failing now except that the ldd check fails now https://travis-ci.org/confluentinc/confluent-kafka-python/jobs/446822558#L2906-L2920)

rebase (86a6160) didn't help

@fkaufer
Copy link
Copy Markdown
Contributor Author

Ryan P (@rnpridgeon) I have closed and reopened the PR to retrigger the CI now that the dependencies seem to be fixed. With success. So, can you merge?

Comment thread examples/integration_test.py
Comment thread examples/integration_test.py Outdated
Comment thread examples/integration_test.py Outdated
@rnpridgeon
Copy link
Copy Markdown
Contributor

Thanks Frank Kaufer (@fkaufer), I think I'm happy with it the way it is now so I have asked Magnus Edenhill (@edenhill) to give it a quick secondary review.

@fkaufer
Copy link
Copy Markdown
Contributor Author

Frank Kaufer (fkaufer) commented Nov 15, 2018

Magnus Edenhill (@edenhill) Ryan P (@rnpridgeon) gents, many thanks for your reviews, I made the changes as requested, all checks have passed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Ryan, you have the final word on this.

@rnpridgeon Ryan P (rnpridgeon) merged commit d874996 into confluentinc:master Nov 16, 2018
@rnpridgeon
Copy link
Copy Markdown
Contributor

Ack, thank you for the secondary review Magnus Edenhill (@edenhill)

@rnpridgeon
Copy link
Copy Markdown
Contributor

Thanks Frank Kaufer (@fkaufer) for your contribution!

schema_dict = schema.to_json()
schemaless_reader(payload, schema_dict)
writer_schema = writer_schema_obj.to_json()
reader_schema = reader_schema_obj.to_json()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm a bit late to the party here.

Frank Kaufer (@fkaufer) , correct me if I'm wrong, but my understanding is that self.reader_value_schema can be null, and so reader_schema_obj could be null as well. Wouldn't that line cause an NPE?

Also I wonder if the next line should not be schemaless_reader(payload, writer_schema, reader_schema) to be consistent with the code executed by the lambda this function returns.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next line is a sanity check as alluded to in the comment below it. This is in place to catch edges which fastavro did not covet at the time the serializer was written. It's unclear if this is still actually an issue or not but it only happens upon instantiating the writer.

You won't get an NPE as the None type is an actual null but you will get an attribute error which would cause the serializer to fall back to slow avro as we call it. If you have already done so please feel free to open an issue which I will address tomorrow.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ryan P (@rnpridgeon) , thanks this is helpful. I had already opened an issue for a related issue: #518 I think both can be addressed at the same time.

I actually tried to fix it my self, but couldn't get the test to run. I would need to spend more time trying to set up my development environment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants