Closed
Description
Starting in confluent-kafka-python 2.6.2, payloads encoded with this Avro schema cannot be deserialized:
["null", {
"type": "record",
"name": "A",
"namespace": "test",
"fields": [
{"name": "First", "type": {"type": "record", "name": "B", "fields": []}},
{"name": "Second", "type": "B"}
]
}]
SchemaParseException: redefined named type: test.B
The top-level union appears to be the culprit; with A
at the top level deserialization works fine. The issue is still present in confluent-kafka-python 2.8.0. Full reproduction and trace:
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer, Schema, SerializationContext
from json import dumps
topic = 'confluent-python-repro'
schema = ['null', {
'type': 'record',
'name': 'A',
'namespace': 'test',
'fields': [
{'name': 'First', 'type': {'type': 'record', 'name': 'B', 'fields': []}},
{'name': 'Second', 'type': 'B'}
]
}]
client = SchemaRegistryClient({'url': 'http://registry.local:8081'})
client.set_compatibility(f'{topic}-value', 'NONE')
schema_id = client.register_schema(f'{topic}-value', Schema(dumps(schema), 'AVRO'))
deserializer = AvroDeserializer(schema_registry_client=client)
payload = bytearray(b'\0')
payload.extend(schema_id.to_bytes(4, 'big'))
payload.extend(b'\0')
deserializer(payload, SerializationContext(topic, 'value'))
---------------------------------------------------------------------------
SchemaParseException Traceback (most recent call last)
Cell In[5], line 25
22 payload.extend(schema_id.to_bytes(4, 'big'))
23 payload.extend(b'\0')
---> 25 deserializer(payload, SerializationContext(topic, 'value'))
File ~/.pyenv/versions/3.12.2/lib/python3.12/site-packages/confluent_kafka/schema_registry/avro.py:586, in AvroDeserializer.__call__(self, data, ctx)
584 obj_dict = self._execute_migrations(ctx, subject, migrations, obj_dict)
585 else:
--> 586 obj_dict = schemaless_reader(payload,
587 writer_schema,
588 reader_schema,
589 self._return_record_name)
591 field_transformer = lambda rule_ctx, field_transform, message: ( # noqa: E731
592 transform(rule_ctx, reader_schema, message, field_transform))
593 obj_dict = self._execute_rules(ctx, subject, RuleMode.READ, None,
594 reader_schema_raw, obj_dict, get_inline_tags(reader_schema),
595 field_transformer)
File fastavro/_read.pyx:1159, in fastavro._read.schemaless_reader()
File fastavro/_read.pyx:1174, in fastavro._read.schemaless_reader()
File fastavro/_schema.pyx:162, in fastavro._schema.parse_schema()
File fastavro/_schema.pyx:173, in fastavro._schema.parse_schema()
File fastavro/_schema.pyx:407, in fastavro._schema._parse_schema()
File fastavro/_schema.pyx:475, in fastavro._schema.parse_field()
File fastavro/_schema.pyx:394, in fastavro._schema._parse_schema()
SchemaParseException: redefined named type: test.B