Skip to content

Union type can't be serialized #656

Closed
@keremsahin1

Description

@keremsahin1

Description

I'm trying to work out the exact same example for AvroProducer with a slightly different schema and value as below.

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "namespace": "my.test",
   "name": "example_value_schema",
   "type": "record",
   "fields" : [
     {
       "name" : "field",
       "type" : [ "null", 
         {
           "type": "record",
           "name": "my_field_type",
           "fields": [ {
             "name": "subfield",
             "type": "string"
           } ]
         }
       ]
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"field": {"my_field_type": {"subfield": "test"}}}
key = {"name": "Key"}

avroProducer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic2', value=value, key=key)
avroProducer.flush()

And I get below error:

Traceback (most recent call last):
File "producer2.py", line 52, in
avroProducer.produce(topic='my_topic2', value=value, key=key)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/init.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 113, in encode_record_with_schema
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 147, in encode_record_with_schema_id
writer(record, outf)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 82, in
return lambda record, fp: schemaless_writer(fp, schema, record)
File "fastavro/_write.pyx", line 582, in fastavro._write.schemaless_writer
File "fastavro/_write.pyx", line 335, in fastavro._write.write_data
File "fastavro/_write.pyx", line 285, in fastavro._write.write_record
File "fastavro/_write.pyx", line 333, in fastavro._write.write_data
File "fastavro/_write.pyx", line 249, in fastavro._write.write_union
ValueError: {'my_field_type': {'subfield': 'test'}} (type <type 'dict'>) do not match [u'null', {'fields': [{'type': u'string', 'name': u'subfield'}], 'type': u'record', 'name': 'my.test.my_field_type'}]

However, if I change my value to value = {"field": {"subfield": "test"}} it seems to work. Why is this not supported? What happens if I define two union records with exact same schema?

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions