Skip to content

Commit 7f9c044

Browse files
author
Matt Howlett
authored
Merge pull request #1000 from 97nitt/avro-deserializer-optional-reader-schema
Make reader schema optional in AvroDeserializer
2 parents 928f75d + b9d3028 commit 7f9c044

File tree

5 files changed

+29
-28
lines changed

5 files changed

+29
-28
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,4 @@ tests/docker/conf/tls/*
2626
.DS_Store
2727
.idea
2828
tmp-KafkaCluster
29+
.venv

src/confluent_kafka/schema_registry/avro.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ class AvroSerializer(Serializer):
122122
See ``avro_producer.py`` in the examples directory for example usage.
123123
124124
Args:
125-
schema_str (str): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
126-
127125
schema_registry_client (SchemaRegistryClient): Schema Registry client instance.
128126
127+
schema_str (str): Avro `Schema Declaration. <https://avro.apache.org/docs/current/spec.html#schemas>`_
128+
129129
to_dict (callable, optional): Callable(object, SerializationContext) -> dict. Converts object to a dict.
130130
131131
conf (dict): AvroSerializer configuration.
@@ -139,7 +139,7 @@ class AvroSerializer(Serializer):
139139
_default_conf = {'auto.register.schemas': True,
140140
'subject.name.strategy': topic_subject_name_strategy}
141141

142-
def __init__(self, schema_str, schema_registry_client,
142+
def __init__(self, schema_registry_client, schema_str,
143143
to_dict=None, conf=None):
144144
self._registry = schema_registry_client
145145
self._schema_id = None
@@ -250,11 +250,12 @@ class AvroDeserializer(Deserializer):
250250
directory for example usage.
251251
252252
Args:
253-
schema_str (str): Avro reader schema declaration.
254-
255253
schema_registry_client (SchemaRegistryClient): Confluent Schema Registry
256254
client instance.
257255
256+
schema_str (str, optional): Avro reader schema declaration.
257+
If not provided, writer schema is used for deserialization.
258+
258259
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
259260
Converts dict to an instance of some object.
260261
@@ -266,11 +267,11 @@ class AvroDeserializer(Deserializer):
266267
"""
267268
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas']
268269

269-
def __init__(self, schema_str, schema_registry_client, from_dict=None):
270+
def __init__(self, schema_registry_client, schema_str=None, from_dict=None):
270271
self._registry = schema_registry_client
271272
self._writer_schemas = {}
272273

273-
self._reader_schema = parse_schema(loads(schema_str))
274+
self._reader_schema = parse_schema(loads(schema_str)) if schema_str else None
274275

275276
if from_dict is not None and not callable(from_dict):
276277
raise ValueError("from_dict must be callable with the signature"

tests/integration/schema_registry/test_avro_serializers.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ def test_avro_record_serialization(kafka_cluster, load_file, avsc, data, record_
7272
sr = kafka_cluster.schema_registry()
7373

7474
schema_str = load_file(avsc)
75-
value_serializer = AvroSerializer(schema_str, sr)
75+
value_serializer = AvroSerializer(sr, schema_str)
7676

77-
value_deserializer = AvroDeserializer(schema_str, sr)
77+
value_deserializer = AvroDeserializer(sr)
7878

7979
producer = kafka_cluster.producer(value_serializer=value_serializer)
8080

@@ -116,9 +116,9 @@ def test_delivery_report_serialization(kafka_cluster, load_file, avsc, data, rec
116116
sr = kafka_cluster.schema_registry()
117117
schema_str = load_file(avsc)
118118

119-
value_serializer = AvroSerializer(schema_str, sr)
119+
value_serializer = AvroSerializer(sr, schema_str)
120120

121-
value_deserializer = AvroDeserializer(schema_str, sr)
121+
value_deserializer = AvroDeserializer(sr)
122122

123123
producer = kafka_cluster.producer(value_serializer=value_serializer)
124124

@@ -163,13 +163,13 @@ def test_avro_record_serialization_custom(kafka_cluster):
163163
sr = kafka_cluster.schema_registry()
164164

165165
user = User('Bowie', 47, 'purple')
166-
value_serializer = AvroSerializer(User.schema_str, sr,
166+
value_serializer = AvroSerializer(sr, User.schema_str,
167167
lambda user, ctx:
168168
dict(name=user.name,
169169
favorite_number=user.favorite_number,
170170
favorite_color=user.favorite_color))
171171

172-
value_deserializer = AvroDeserializer(User.schema_str, sr,
172+
value_deserializer = AvroDeserializer(sr, User.schema_str,
173173
lambda user_dict, ctx:
174174
User(**user_dict))
175175

tests/schema_registry/test_avro_serializer.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_avro_serializer_config_auto_register_schemas():
3434
"""
3535
conf = {'url': TEST_URL}
3636
test_client = SchemaRegistryClient(conf)
37-
test_serializer = AvroSerializer("string", test_client,
37+
test_serializer = AvroSerializer(test_client, 'string',
3838
conf={'auto.register.schemas': False})
3939
assert not test_serializer._auto_register
4040

@@ -47,7 +47,7 @@ def test_avro_serializer_config_auto_register_schemas_invalid():
4747
test_client = SchemaRegistryClient(conf)
4848

4949
with pytest.raises(ValueError, match="must be a boolean"):
50-
AvroSerializer("string", test_client,
50+
AvroSerializer(test_client, 'string',
5151
conf={'auto.register.schemas': dict()})
5252

5353

@@ -60,7 +60,7 @@ def test_avro_serializer_config_auto_register_schemas_false(mock_schema_registry
6060
topic = "test-auto-register"
6161
subject = topic + '-key'
6262

63-
test_serializer = AvroSerializer("string", test_client,
63+
test_serializer = AvroSerializer(test_client, 'string',
6464
conf={'auto.register.schemas': False})
6565

6666
test_serializer("test",
@@ -81,7 +81,7 @@ def test_avro_serializer_config_subject_name_strategy():
8181

8282
conf = {'url': TEST_URL}
8383
test_client = SchemaRegistryClient(conf)
84-
test_serializer = AvroSerializer("int", test_client,
84+
test_serializer = AvroSerializer(test_client, 'int',
8585
conf={'subject.name.strategy':
8686
record_subject_name_strategy})
8787

@@ -96,7 +96,7 @@ def test_avro_serializer_config_subject_name_strategy_invalid():
9696
conf = {'url': TEST_URL}
9797
test_client = SchemaRegistryClient(conf)
9898
with pytest.raises(ValueError, match="must be callable"):
99-
AvroSerializer("int", test_client,
99+
AvroSerializer(test_client, 'int',
100100
conf={'subject.name.strategy': dict()})
101101

102102

@@ -106,8 +106,8 @@ def test_avro_serializer_record_subject_name_strategy(load_avsc):
106106
"""
107107
conf = {'url': TEST_URL}
108108
test_client = SchemaRegistryClient(conf)
109-
test_serializer = AvroSerializer(load_avsc('basic_schema.avsc'),
110-
test_client,
109+
test_serializer = AvroSerializer(test_client,
110+
load_avsc('basic_schema.avsc'),
111111
conf={'subject.name.strategy':
112112
record_subject_name_strategy})
113113

@@ -123,7 +123,7 @@ def test_avro_serializer_record_subject_name_strategy_primitive(load_avsc):
123123
"""
124124
conf = {'url': TEST_URL}
125125
test_client = SchemaRegistryClient(conf)
126-
test_serializer = AvroSerializer('int', test_client,
126+
test_serializer = AvroSerializer(test_client, 'int',
127127
conf={'subject.name.strategy':
128128
record_subject_name_strategy})
129129

@@ -138,8 +138,8 @@ def test_avro_serializer_topic_record_subject_name_strategy(load_avsc):
138138
"""
139139
conf = {'url': TEST_URL}
140140
test_client = SchemaRegistryClient(conf)
141-
test_serializer = AvroSerializer(load_avsc('basic_schema.avsc'),
142-
test_client,
141+
test_serializer = AvroSerializer(test_client,
142+
load_avsc('basic_schema.avsc'),
143143
conf={'subject.name.strategy':
144144
topic_record_subject_name_strategy})
145145

@@ -155,8 +155,7 @@ def test_avro_serializer_topic_record_subject_name_strategy_primitive(load_avsc)
155155
"""
156156
conf = {'url': TEST_URL}
157157
test_client = SchemaRegistryClient(conf)
158-
test_serializer = AvroSerializer('int',
159-
test_client,
158+
test_serializer = AvroSerializer(test_client, 'int',
160159
conf={'subject.name.strategy':
161160
topic_record_subject_name_strategy})
162161

@@ -171,8 +170,8 @@ def test_avro_serializer_subject_name_strategy_default(load_avsc):
171170
"""
172171
conf = {'url': TEST_URL}
173172
test_client = SchemaRegistryClient(conf)
174-
test_serializer = AvroSerializer(load_avsc('basic_schema.avsc'),
175-
test_client)
173+
test_serializer = AvroSerializer(test_client,
174+
load_avsc('basic_schema.avsc'))
176175

177176
ctx = SerializationContext('test_subj', MessageField.VALUE)
178177
assert test_serializer._subject_name_func(

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,6 @@ testpaths = tests
3434
norecursedirs = tests/integration/*/java
3535

3636
[flake8]
37-
exclude = venv*,env,.env,.tox,.toxenv,.git,build,docs,tools,tmp-build,*_pb2.py
37+
exclude = venv*,.venv*,env,.env,.tox,.toxenv,.git,build,docs,tools,tmp-build,*_pb2.py
3838
max-line-length = 119
3939
accept-encodings = utf-8

0 commit comments

Comments
 (0)