Skip to content

Commit 5d00428

Browse files
committed
Support encoding with fastavro
1 parent 15800d0 commit 5d00428

File tree

1 file changed

+12
-13
lines changed

1 file changed

+12
-13
lines changed

confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
HAS_FAST = False
4141
try:
42-
from fastavro import schemaless_reader
42+
from fastavro import schemaless_reader, schemaless_writer
4343

4444
HAS_FAST = True
4545
except ImportError:
@@ -75,9 +75,13 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
7575
self.reader_key_schema = reader_key_schema
7676
self.reader_value_schema = reader_value_schema
7777

78-
'''
79-
80-
'''
78+
# Encoder support
79+
def _get_encoder_func(self, writer_schema):
80+
if HAS_FAST:
81+
schema = writer_schema.to_json()
82+
return lambda record, fp: schemaless_writer(fp, schema, record)
83+
writer = avro.io.DatumWriter(writer_schema)
84+
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
8185

8286
def encode_record_with_schema(self, topic, schema, record, is_key=False):
8387
"""
@@ -103,7 +107,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
103107
raise serialize_err(message)
104108

105109
# cache writer
106-
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
110+
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
107111

108112
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
109113

@@ -126,7 +130,7 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
126130
schema = self.registry_client.get_by_id(schema_id)
127131
if not schema:
128132
raise serialize_err("Schema does not exist")
129-
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
133+
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
130134
except ClientError:
131135
exc_type, exc_value, exc_traceback = sys.exc_info()
132136
raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
@@ -136,19 +140,14 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
136140
with ContextStringIO() as outf:
137141
# write the header
138142
# magic byte
139-
140143
outf.write(struct.pack('b', MAGIC_BYTE))
141144

142145
# write the schema ID in network byte order (big end)
143-
144146
outf.write(struct.pack('>I', schema_id))
145147

146-
# write the record to the rest of it
148+
# write the record to the rest of the buffer
147149
# Create an encoder that we'll write to
148-
encoder = avro.io.BinaryEncoder(outf)
149-
# write the magic byte
150-
# write the object in 'obj' as Avro to the fake file...
151-
writer.write(record, encoder)
150+
writer(record, outf)
152151

153152
return outf.getvalue()
154153

0 commit comments

Comments
 (0)