40
40
HAS_FAST = False
41
41
try :
42
42
from fastavro import schemaless_reader , schemaless_writer
43
+ from fastavro .schema import parse_schema
43
44
44
45
HAS_FAST = True
45
46
except ImportError :
@@ -79,7 +80,8 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
79
80
def _get_encoder_func (self , writer_schema ):
80
81
if HAS_FAST :
81
82
schema = writer_schema .to_json ()
82
- return lambda record , fp : schemaless_writer (fp , schema , record )
83
+ parsed_schema = parse_schema (schema )
84
+ return lambda record , fp : schemaless_writer (fp , parsed_schema , record )
83
85
writer = avro .io .DatumWriter (writer_schema )
84
86
return lambda record , fp : writer .write (record , avro .io .BinaryEncoder (fp ))
85
87
@@ -169,9 +171,9 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
169
171
if HAS_FAST :
170
172
# try to use fast avro
171
173
try :
172
- writer_schema = writer_schema_obj .to_json ()
173
- reader_schema = reader_schema_obj .to_json ()
174
- schemaless_reader (payload , writer_schema )
174
+ fast_avro_writer_schema = parse_schema ( writer_schema_obj .to_json () )
175
+ fast_avro_reader_schema = parse_schema ( reader_schema_obj .to_json () )
176
+ schemaless_reader (payload , fast_avro_writer_schema )
175
177
176
178
# If we reach this point, this means we have fastavro and it can
177
179
# do this deserialization. Rewind since this method just determines
@@ -180,7 +182,7 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
180
182
payload .seek (curr_pos )
181
183
182
184
self .id_to_decoder_func [schema_id ] = lambda p : schemaless_reader (
183
- p , writer_schema , reader_schema )
185
+ p , fast_avro_writer_schema , fast_avro_reader_schema )
184
186
return self .id_to_decoder_func [schema_id ]
185
187
except Exception :
186
188
# Fast avro failed, fall thru to standard avro below.
0 commit comments