Skip to content

Support encoding with fastavro, fix minor doc bug #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 57 additions & 14 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

HAS_FAST = False
try:
from fastavro import schemaless_reader
from fastavro import schemaless_reader, schemaless_writer

HAS_FAST = True
except ImportError:
Expand Down Expand Up @@ -77,6 +77,13 @@ def __init__(self, registry_client):

'''

def _slow_make_datum_writer(self, schema):
return avro.io.DatumWriter(schema)
Copy link
Contributor

@rnpridgeon rnpridgeon Jul 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although a bit ugly it would be nice to share the interface of the writer returned by _fast_make_datum_writer

def _slow_make_datum_writer(self, schema):
   writer = avro.io.DatumWriter(schema)
   return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))


def _fast_make_datum_writer(self, record_schema):
schema = record_schema.to_json()
return lambda fp, record: schemaless_writer(fp, schema, record)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would transpose lambda arguments fp and record to maintain consistency with the existing writer interface.


def encode_record_with_schema(self, topic, schema, record, is_key=False):
"""
Given a parsed avro schema, encode a record for the given topic. The
Expand All @@ -101,34 +108,35 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
raise serialize_err(message)

# cache writer
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
if id not in self.id_to_writers:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method is also responsible for the schema's registration I do not think this check provides us any benefit.

self.id_to_writers[schema_id] = self.make_datum_writer(schema)

return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)

def encode_record_with_schema_id(self, schema_id, record, is_key=False):
"""
Encode a record with a given schema id. The record must
be a python dictionary.
@:param: schema_id : integer ID
@:param: record : An object to serialize
@:param is_key : If the record is a key
@:returns: decoder function
"""
def _ensure_writer(self, schema_id, is_key=False):
serialize_err = KeySerializerError if is_key else ValueSerializerError

# use slow avro
if schema_id not in self.id_to_writers:
# get the writer + schema

try:
schema = self.registry_client.get_by_id(schema_id)
if not schema:
raise serialize_err("Schema does not exist")
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
self.id_to_writers[schema_id] = self.make_datum_writer(schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this logic back into _encode_record_with_schema_id and eliminate the redundant _slow_encode_record_with_schema_id and _fast_encode_record_with_schema_id methods. If we provide the same interface for both writers there is no longer a need to maintain two code paths which is a bit more DRY.

except ClientError:
exc_type, exc_value, exc_traceback = sys.exc_info()
raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))

def _slow_encode_record_with_schema_id(self, schema_id, record, is_key=False):
"""
Encode a record with a given schema id. The record must
be a python dictionary.
@:param: schema_id : integer ID
@:param: record : An object to serialize
@:param is_key : If the record is a key
@:returns: Encoded record with schema ID as bytes
"""

# get the writer
writer = self.id_to_writers[schema_id]
with ContextStringIO() as outf:
Expand All @@ -150,6 +158,41 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):

return outf.getvalue()

def _fast_encode_record_with_schema_id(self, schema_id, record, is_key=False):
"""
Encode a record with a given schema id. The record must
be a python dictionary.
@:param: schema_id : integer ID
@:param: record : An object to serialize
@:param is_key : If the record is a key
@:returns: Encoded record with schema ID as bytes
"""
self._ensure_writer(schema_id, is_key)

# get the writer
writer = self.id_to_writers[schema_id]
with ContextStringIO() as outf:
# write the header
# magic byte

outf.write(struct.pack('b', MAGIC_BYTE))

# write the schema ID in network byte order (big end)

outf.write(struct.pack('>I', schema_id))

# write the record to the rest of it
writer(outf, record)

return outf.getvalue()

if not HAS_FAST:
encode_record_with_schema_id = _slow_encode_record_with_schema_id
make_datum_writer = _slow_make_datum_writer
else:
encode_record_with_schema_id = _fast_encode_record_with_schema_id
make_datum_writer = _fast_make_datum_writer

# Decoder support
def _get_decoder_func(self, schema_id, payload):
if schema_id in self.id_to_decoder_func:
Expand Down