Skip to content

UnknownType error when deserializing using union schema #1562

Closed
@rgraber

Description

@rgraber

Product: Confluent Cloud / confluent_kafka
Using confluent-kafka 2.1.0

We were trying to follow the instructions here to configure the schema on one of our topics to be an Avro union. Using the control center (we use Confluent Cloud), we set the schema to ["Schema1", "Schema2"], adding Schema1 and Schema2 as references (not the actual names, they are long and irrelevant).
We were then able to produce events with both schemas to the topic. However, when we tried to consume and deserialize using the AvroDeserializer, we got "fastavro._schema_common.UnknownType: Schema1"

The pseudocode of what we're trying to do:

msg = consumer.poll()
msg_type = msg.headers()[MSG_TYPE]  # will return whether it will be Type1 or Type2
deserializer = AvroDeserializer(
          schema_str: get_schema_from_msg_type(msg_type), # will return Schema1 or Schema2
          schema_registry_client=schema_registry_client, 
          from_dict=some_from_dict
)
deserialized_value = deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE, msg.headers())

I think the issue is here:

writer_schema = parse_schema(loads(

When the code calls parse_schema, it doesn't use expand=True. Thus, it sets the writer schema to ["Schema1","Schema2"], as was configured on the topic, but does not provide a reference to what Schema1 and Schema2 are. Thus, when it calls schemaless_reader on the next line, it has no idea what Schema1 and Schema2 are. I think the fix is to change the call to

writer_schema = parse_schema(
       loads(prepared_schema.schema_str), 
       named_schemas=self._named_schemas),
       expand=True

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions