-
Notifications
You must be signed in to change notification settings - Fork 915
Support encoding with fastavro, fix minor doc bug #419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
It looks like @marcintustin hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
599454b
to
6ea7853
Compare
@confluentinc It looks like @marcintustin just signed our Contributor License Agreement. 👍 Always at your service, clabot |
The failure here appears to be with one of the workers timing out. I can't force a re-run, I'm afraid. |
Thanks @marcintustin! I'll take a look tomorrow, in the meantime I have forced a restart of the failed worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for this, I have made a few suggestions to help DRY things up a bit but for the most part it looks good.
@@ -77,6 +77,13 @@ def __init__(self, registry_client): | |||
|
|||
''' | |||
|
|||
def _slow_make_datum_writer(self, schema): | |||
return avro.io.DatumWriter(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although a bit ugly it would be nice to share the interface of the writer returned by _fast_make_datum_writer
def _slow_make_datum_writer(self, schema):
writer = avro.io.DatumWriter(schema)
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
|
||
def _fast_make_datum_writer(self, record_schema): | ||
schema = record_schema.to_json() | ||
return lambda fp, record: schemaless_writer(fp, schema, record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would transpose lambda arguments fp
and record
to maintain consistency with the existing writer interface.
@@ -101,34 +108,35 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False): | |||
raise serialize_err(message) | |||
|
|||
# cache writer | |||
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema) | |||
if id not in self.id_to_writers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this method is also responsible for the schema's registration I do not think this check provides us any benefit.
try: | ||
schema = self.registry_client.get_by_id(schema_id) | ||
if not schema: | ||
raise serialize_err("Schema does not exist") | ||
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema) | ||
self.id_to_writers[schema_id] = self.make_datum_writer(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this logic back into _encode_record_with_schema_id
and eliminate the redundant _slow_encode_record_with_schema_id
and _fast_encode_record_with_schema_id
methods. If we provide the same interface for both writers there is no longer a need to maintain two code paths which is a bit more DRY.
Thanks for the contribution @marcintustin , I went ahead and made the requested changes in #492 to try and get this in for the 1.0 release which enters code freeze at EOD. |
This PR adds the ability to encode with fastavro, if it is available. Also, fixes a minor misprecision in the docstring for
encode_record_with_schema_id
.