3939
4040HAS_FAST = False
4141try :
42- from fastavro import schemaless_reader
42+ from fastavro import schemaless_reader , schemaless_writer
4343
4444 HAS_FAST = True
4545except ImportError :
@@ -75,9 +75,13 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
7575 self .reader_key_schema = reader_key_schema
7676 self .reader_value_schema = reader_value_schema
7777
78- '''
79-
80- '''
78+ # Encoder support
79+ def _get_encoder_func (self , writer_schema ):
80+ if HAS_FAST :
81+ schema = writer_schema .to_json ()
82+ return lambda record , fp : schemaless_writer (fp , schema , record )
83+ writer = avro .io .DatumWriter (writer_schema )
84+ return lambda record , fp : writer .write (record , avro .io .BinaryEncoder (fp ))
8185
8286 def encode_record_with_schema (self , topic , schema , record , is_key = False ):
8387 """
@@ -104,7 +108,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
104108 raise serialize_err (message )
105109
106110 # cache writer
107- self .id_to_writers [schema_id ] = avro . io . DatumWriter (schema )
111+ self .id_to_writers [schema_id ] = self . _get_encoder_func (schema )
108112
109113 return self .encode_record_with_schema_id (schema_id , record , is_key = is_key )
110114
@@ -128,29 +132,19 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
128132 schema = self .registry_client .get_by_id (schema_id )
129133 if not schema :
130134 raise serialize_err ("Schema does not exist" )
131- self .id_to_writers [schema_id ] = avro . io . DatumWriter (schema )
135+ self .id_to_writers [schema_id ] = self . _get_encoder_func (schema )
132136 except ClientError :
133137 exc_type , exc_value , exc_traceback = sys .exc_info ()
134138 raise serialize_err (repr (traceback .format_exception (exc_type , exc_value , exc_traceback )))
135139
136140 # get the writer
137141 writer = self .id_to_writers [schema_id ]
138142 with ContextStringIO () as outf :
139- # write the header
140- # magic byte
141-
142- outf .write (struct .pack ('b' , MAGIC_BYTE ))
143-
144- # write the schema ID in network byte order (big end)
145-
146- outf .write (struct .pack ('>I' , schema_id ))
143+ # Write the magic byte and schema ID in network byte order (big endian)
144+ outf .write (struct .pack ('>bI' , MAGIC_BYTE , schema_id ))
147145
148- # write the record to the rest of it
149- # Create an encoder that we'll write to
150- encoder = avro .io .BinaryEncoder (outf )
151- # write the magic byte
152- # write the object in 'obj' as Avro to the fake file...
153- writer .write (record , encoder )
146+ # write the record to the rest of the buffer
147+ writer (record , outf )
154148
155149 return outf .getvalue ()
156150
0 commit comments