Skip to content

Commit d9669df

Browse files
Matt Howlettfimmtiubstaudacher
authored
Only write schema cache once (#1057)
* Only write to the writer schema cache once. * Add test for the _get_encoder_func caching. * Patch against self.ms, change iterators to list for reuse across test cases Co-authored-by: Dennis Taylor <dennis.taylor@clio.com> Co-authored-by: Braden Staudacher <braden.staudacher@clio.com>
1 parent 11d05ef commit d9669df

File tree

3 files changed

+17
-4
lines changed

3 files changed

+17
-4
lines changed

src/confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
113113
raise serialize_err(message)
114114

115115
# cache writer
116-
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
116+
if schema_id not in self.id_to_writers:
117+
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
117118

118119
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
119120

tests/avro/data_gen.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def create_basic_item(i):
5454
}
5555

5656

57-
BASIC_ITEMS = map(create_basic_item, range(1, 20))
57+
BASIC_ITEMS = list(map(create_basic_item, range(1, 20)))
5858

5959
ADVANCED_SCHEMA = load_schema_file(os.path.join(avsc_dir, 'adv_schema.avsc'))
6060

@@ -68,7 +68,7 @@ def create_adv_item(i):
6868
return basic
6969

7070

71-
ADVANCED_ITEMS = map(create_adv_item, range(1, 20))
71+
ADVANCED_ITEMS = list(map(create_adv_item, range(1, 20)))
7272

7373

7474
def _write_items(base_name, schema_str, items):

tests/avro/test_message_serializer.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import struct
2424

2525
import unittest
26+
from unittest.mock import patch
2627

2728
from tests.avro import data_gen
2829
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
@@ -75,8 +76,19 @@ def test_encode_record_with_schema(self):
7576
message = self.ms.encode_record_with_schema(topic, basic, record)
7677
self.assertMessageIsSame(message, record, schema_id)
7778

79+
def test_encode_record_with_schema_sets_writers_cache_once(self):
80+
topic = 'test'
81+
basic = avro.loads(data_gen.BASIC_SCHEMA)
82+
subject = 'test-value'
83+
self.client.register(subject, basic)
84+
records = data_gen.BASIC_ITEMS
85+
with patch.object(self.ms, "_get_encoder_func") as encoder_func_mock:
86+
for record in records:
87+
self.ms.encode_record_with_schema(topic, basic, record)
88+
encoder_func_mock.assert_called_once_with(basic)
89+
7890
def test_decode_none(self):
79-
""""null/None messages should decode to None"""
91+
"""null/None messages should decode to None"""
8092

8193
self.assertIsNone(self.ms.decode_message(None))
8294

0 commit comments

Comments
 (0)