Skip to content

Make reader schema optional in AvroDeserializer #1000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ tests/docker/conf/tls/*
.DS_Store
.idea
tmp-KafkaCluster
.venv
15 changes: 8 additions & 7 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ class AvroSerializer(Serializer):
See ``avro_producer.py`` in the examples directory for example usage.

Args:
schema_str (str): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_

schema_registry_client (SchemaRegistryClient): Schema Registry client instance.

schema_str (str): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_

to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict.

conf (dict): AvroSerializer configuration.
Expand All @@ -139,7 +139,7 @@ class AvroSerializer(Serializer):
_default_conf = {'auto.register.schemas': True,
'subject.name.strategy': topic_subject_name_strategy}

def __init__(self, schema_str, schema_registry_client,
def __init__(self, schema_registry_client, schema_str,
to_dict=None, conf=None):
self._registry = schema_registry_client
self._schema_id = None
Expand Down Expand Up @@ -250,11 +250,12 @@ class AvroDeserializer(Deserializer):
directory for example usage.

Args:
schema_str (str): Avro reader schema declaration.

schema_registry_client (SchemaRegistryClient): Confluent Schema Registry
client instance.

schema_str (str, optional): Avro reader schema declaration.
If not provided, writer schema is used for deserialization.

from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts dict to an instance of some object.

Expand All @@ -266,11 +267,11 @@ class AvroDeserializer(Deserializer):
"""
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas']

def __init__(self, schema_str, schema_registry_client, from_dict=None):
def __init__(self, schema_registry_client, schema_str=None, from_dict=None):
self._registry = schema_registry_client
self._writer_schemas = {}

self._reader_schema = parse_schema(loads(schema_str))
self._reader_schema = parse_schema(loads(schema_str)) if schema_str else None

if from_dict is not None and not callable(from_dict):
raise ValueError("from_dict must be callable with the signature"
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/schema_registry/test_avro_serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, record_
sr = kafka_cluster.schema_registry()

schema_str = load_file(avsc)
value_serializer = AvroSerializer(schema_str, sr)
value_serializer = AvroSerializer(sr, schema_str)

value_deserializer = AvroDeserializer(schema_str, sr)
value_deserializer = AvroDeserializer(sr)

producer = kafka_cluster.producer(value_serializer=value_serializer)

Expand Down Expand Up @@ -116,9 +116,9 @@ def test_delivery_report_serialization(kafka_cluster, load_file, avsc, data, rec
sr = kafka_cluster.schema_registry()
schema_str = load_file(avsc)

value_serializer = AvroSerializer(schema_str, sr)
value_serializer = AvroSerializer(sr, schema_str)

value_deserializer = AvroDeserializer(schema_str, sr)
value_deserializer = AvroDeserializer(sr)

producer = kafka_cluster.producer(value_serializer=value_serializer)

Expand Down Expand Up @@ -163,13 +163,13 @@ def test_avro_record_serialization_custom(kafka_cluster):
sr = kafka_cluster.schema_registry()

user = User('Bowie', 47, 'purple')
value_serializer = AvroSerializer(User.schema_str, sr,
value_serializer = AvroSerializer(sr, User.schema_str,
lambda user, ctx:
dict(name=user.name,
favorite_number=user.favorite_number,
favorite_color=user.favorite_color))

value_deserializer = AvroDeserializer(User.schema_str, sr,
value_deserializer = AvroDeserializer(sr, User.schema_str,
lambda user_dict, ctx:
User(**user_dict))

Expand Down
27 changes: 13 additions & 14 deletions tests/schema_registry/test_avro_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_avro_serializer_config_auto_register_schemas():
"""
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer("string", test_client,
test_serializer = AvroSerializer(test_client, 'string',
conf={'auto.register.schemas': False})
assert not test_serializer._auto_register

Expand All @@ -47,7 +47,7 @@ def test_avro_serializer_config_auto_register_schemas_invalid():
test_client = SchemaRegistryClient(conf)

with pytest.raises(ValueError, match="must be a boolean"):
AvroSerializer("string", test_client,
AvroSerializer(test_client, 'string',
conf={'auto.register.schemas': dict()})


Expand All @@ -60,7 +60,7 @@ def test_avro_serializer_config_auto_register_schemas_false(mock_schema_registry
topic = "test-auto-register"
subject = topic + '-key'

test_serializer = AvroSerializer("string", test_client,
test_serializer = AvroSerializer(test_client, 'string',
conf={'auto.register.schemas': False})

test_serializer("test",
Expand All @@ -81,7 +81,7 @@ def test_avro_serializer_config_subject_name_strategy():

conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer("int", test_client,
test_serializer = AvroSerializer(test_client, 'int',
conf={'subject.name.strategy':
record_subject_name_strategy})

Expand All @@ -96,7 +96,7 @@ def test_avro_serializer_config_subject_name_strategy_invalid():
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
with pytest.raises(ValueError, match="must be callable"):
AvroSerializer("int", test_client,
AvroSerializer(test_client, 'int',
conf={'subject.name.strategy': dict()})


Expand All @@ -106,8 +106,8 @@ def test_avro_serializer_record_subject_name_strategy(load_avsc):
"""
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer(load_avsc('basic_schema.avsc'),
test_client,
test_serializer = AvroSerializer(test_client,
load_avsc('basic_schema.avsc'),
conf={'subject.name.strategy':
record_subject_name_strategy})

Expand All @@ -123,7 +123,7 @@ def test_avro_serializer_record_subject_name_strategy_primitive(load_avsc):
"""
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer('int', test_client,
test_serializer = AvroSerializer(test_client, 'int',
conf={'subject.name.strategy':
record_subject_name_strategy})

Expand All @@ -138,8 +138,8 @@ def test_avro_serializer_topic_record_subject_name_strategy(load_avsc):
"""
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer(load_avsc('basic_schema.avsc'),
test_client,
test_serializer = AvroSerializer(test_client,
load_avsc('basic_schema.avsc'),
conf={'subject.name.strategy':
topic_record_subject_name_strategy})

Expand All @@ -155,8 +155,7 @@ def test_avro_serializer_topic_record_subject_name_strategy_primitive(load_avsc)
"""
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer('int',
test_client,
test_serializer = AvroSerializer(test_client, 'int',
conf={'subject.name.strategy':
topic_record_subject_name_strategy})

Expand All @@ -171,8 +170,8 @@ def test_avro_serializer_subject_name_strategy_default(load_avsc):
"""
conf = {'url': TEST_URL}
test_client = SchemaRegistryClient(conf)
test_serializer = AvroSerializer(load_avsc('basic_schema.avsc'),
test_client)
test_serializer = AvroSerializer(test_client,
load_avsc('basic_schema.avsc'))

ctx = SerializationContext('test_subj', MessageField.VALUE)
assert test_serializer._subject_name_func(
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ testpaths = tests
norecursedirs = tests/integration/*/java

[flake8]
exclude = venv*,env,.env,.tox,.toxenv,.git,build,docs,tools,tmp-build,*_pb2.py
exclude = venv*,.venv*,env,.env,.tox,.toxenv,.git,build,docs,tools,tmp-build,*_pb2.py
max-line-length = 119
accept-encodings = utf-8