Description
Description
I'm trying to work out the exact same example for AvroProducer with a slightly different schema and value as below.
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{
"namespace": "my.test",
"name": "example_value_schema",
"type": "record",
"fields" : [
{
"name" : "field",
"type" : [ "null",
{
"type": "record",
"name": "my_field_type",
"fields": [ {
"name": "subfield",
"type": "string"
} ]
}
]
}
]
}
"""
key_schema_str = """
{
"namespace": "my.test",
"name": "key",
"type": "record",
"fields" : [
{
"name" : "name",
"type" : "string"
}
]
}
"""
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"field": {"my_field_type": {"subfield": "test"}}}
key = {"name": "Key"}
avroProducer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic2', value=value, key=key)
avroProducer.flush()
And I get below error:
Traceback (most recent call last):
File "producer2.py", line 52, in
avroProducer.produce(topic='my_topic2', value=value, key=key)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/init.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 113, in encode_record_with_schema
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 147, in encode_record_with_schema_id
writer(record, outf)
File "/Library/Python/2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 82, in
return lambda record, fp: schemaless_writer(fp, schema, record)
File "fastavro/_write.pyx", line 582, in fastavro._write.schemaless_writer
File "fastavro/_write.pyx", line 335, in fastavro._write.write_data
File "fastavro/_write.pyx", line 285, in fastavro._write.write_record
File "fastavro/_write.pyx", line 333, in fastavro._write.write_data
File "fastavro/_write.pyx", line 249, in fastavro._write.write_union
ValueError: {'my_field_type': {'subfield': 'test'}} (type <type 'dict'>) do not match [u'null', {'fields': [{'type': u'string', 'name': u'subfield'}], 'type': u'record', 'name': 'my.test.my_field_type'}]
However, if I change my value to value = {"field": {"subfield": "test"}}
it seems to work. Why is this not supported? What happens if I define two union records with exact same schema?
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue