Closed
Description
Product: Confluent Cloud / confluent_kafka
Using confluent-kafka 2.1.0
We were trying to follow the instructions here to configure the schema on one of our topics to be an Avro union. Using the control center (we use Confluent Cloud), we set the schema to ["Schema1", "Schema2"], adding Schema1 and Schema2 as references (not the actual names, they are long and irrelevant).
We were then able to produce events with both schemas to the topic. However, when we tried to consume and deserialize using the AvroDeserializer, we got "fastavro._schema_common.UnknownType: Schema1"
The pseudocode of what we're trying to do:
msg = consumer.poll()
msg_type = msg.headers()[MSG_TYPE] # will return whether it will be Type1 or Type2
deserializer = AvroDeserializer(
schema_str: get_schema_from_msg_type(msg_type), # will return Schema1 or Schema2
schema_registry_client=schema_registry_client,
from_dict=some_from_dict
)
deserialized_value = deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE, msg.headers())
I think the issue is here:
When the code calls parse_schema, it doesn't use
expand=True.
Thus, it sets the writer schema to ["Schema1","Schema2"], as was configured on the topic, but does not provide a reference to what Schema1 and Schema2 are. Thus, when it calls schemaless_reader
on the next line, it has no idea what Schema1 and Schema2 are. I think the fix is to change the call to
writer_schema = parse_schema(
loads(prepared_schema.schema_str),
named_schemas=self._named_schemas),
expand=True