Skip to content

Commit 7905ccb

Browse files
authored
DGS-10484 Improve SR caching on Python client (#1744)
* add more caching to SR calls * oops * fix flake8
1 parent b3bde5c commit 7905ccb

File tree

2 files changed

+116
-26
lines changed

2 files changed

+116
-26
lines changed

src/confluent_kafka/schema_registry/schema_registry_client.py

Lines changed: 116 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,6 @@ def set(self, schema_id, schema, subject_name=None):
207207
schema (Schema): Schema instance
208208
209209
subject_name(str): Optional, subject schema is registered under
210-
211-
Returns:
212-
int: The schema_id
213210
"""
214211

215212
with self.lock:
@@ -229,7 +226,8 @@ def get_schema(self, schema_id):
229226
Schema: The schema if known; else None
230227
"""
231228

232-
return self.schema_id_index.get(schema_id, None)
229+
with self.lock:
230+
return self.schema_id_index.get(schema_id, None)
233231

234232
def get_schema_id_by_subject(self, subject, schema):
235233
"""
@@ -249,6 +247,73 @@ def get_schema_id_by_subject(self, subject, schema):
249247
return self.schema_index.get(schema, None)
250248

251249

250+
class _RegisteredSchemaCache(object):
251+
"""
252+
Thread-safe cache for use with the Schema Registry Client.
253+
254+
This cache may be used to retrieve registered schemas based on subject_name/version/schema
255+
- Get registered schema based on subject name + version
256+
- Get registered schema based on subject name + schema
257+
"""
258+
259+
def __init__(self):
260+
self.lock = Lock()
261+
self.schema_version_index = defaultdict(dict)
262+
self.schema_index = defaultdict(dict)
263+
264+
def set(self, subject_name, schema, version, registered_schema):
265+
"""
266+
Add a Schema identified by schema_id to the cache.
267+
268+
Args:
269+
subject_name (str): The subject name this registered schema is associated with
270+
271+
schema (Schema): The schema this registered schema is associated with
272+
273+
version (int): The version this registered schema is associated with
274+
275+
registered_schema (RegisteredSchema): The registered schema instance
276+
"""
277+
278+
with self.lock:
279+
if schema is not None:
280+
self.schema_index[subject_name][schema] = registered_schema
281+
elif version is not None:
282+
self.schema_version_index[subject_name][version] = registered_schema
283+
284+
def get_registered_schema_by_version(self, subject_name, version):
285+
"""
286+
Get the registered schema instance associated with version from the cache.
287+
288+
Args:
289+
subject_name (str): The subject name this registered schema is associated with
290+
291+
version (int): The version this registered schema is associated with
292+
293+
Returns:
294+
RegisteredSchema: The registered schema if known; else None
295+
"""
296+
297+
with self.lock:
298+
return self.schema_version_index.get(subject_name, {}).get(version, None)
299+
300+
def get_registered_schema_by_schema(self, subject_name, schema):
301+
"""
302+
Get the registered schema instance associated with schema from the cache.
303+
304+
Args:
305+
subject_name (str): The subject name this registered schema is associated with
306+
307+
schema (Schema): The schema this registered schema is associated with
308+
309+
Returns:
310+
RegisteredSchema: The registered schema if known; else None
311+
"""
312+
313+
with self.lock:
314+
return self.schema_index.get(subject_name, {}).get(schema, None)
315+
316+
252317
class SchemaRegistryClient(object):
253318
"""
254319
A Confluent Schema Registry client.
@@ -292,6 +357,7 @@ class SchemaRegistryClient(object):
292357
def __init__(self, conf):
293358
self._rest_client = _RestClient(conf)
294359
self._cache = _SchemaCache()
360+
self._metadata_cache = _RegisteredSchemaCache()
295361

296362
def __enter__(self):
297363
return self
@@ -398,6 +464,10 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False):
398464
`POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
399465
""" # noqa: E501
400466

467+
registered_schema = self._metadata_cache.get_registered_schema_by_schema(subject_name, schema)
468+
if registered_schema is not None:
469+
return registered_schema
470+
401471
request = {'schema': schema.schema_str}
402472

403473
# CP 5.5 adds new fields (for JSON and Protobuf).
@@ -414,17 +484,25 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False):
414484

415485
schema_type = response.get('schemaType', 'AVRO')
416486

417-
return RegisteredSchema(schema_id=response['id'],
418-
schema=Schema(response['schema'],
419-
schema_type,
420-
[
421-
SchemaReference(name=ref['name'],
422-
subject=ref['subject'],
423-
version=ref['version'])
424-
for ref in response.get('references', [])
425-
]),
426-
subject=response['subject'],
427-
version=response['version'])
487+
registered_schema = RegisteredSchema(
488+
schema_id=response['id'],
489+
schema=Schema(
490+
response['schema'],
491+
schema_type,
492+
[
493+
SchemaReference(
494+
name=ref['name'],
495+
subject=ref['subject'],
496+
version=ref['version']
497+
) for ref in response.get('references', [])
498+
]
499+
),
500+
subject=response['subject'],
501+
version=response['version']
502+
)
503+
self._metadata_cache.set(subject_name, schema, None, registered_schema)
504+
505+
return registered_schema
428506

429507
def get_subjects(self):
430508
"""
@@ -524,22 +602,34 @@ def get_version(self, subject_name, version):
524602
`GET Subject Version API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
525603
""" # noqa: E501
526604

605+
registered_schema = self._metadata_cache.get_registered_schema_by_version(subject_name, version)
606+
if registered_schema is not None:
607+
return registered_schema
608+
527609
response = self._rest_client.get('subjects/{}/versions/{}'
528610
.format(_urlencode(subject_name),
529611
version))
530612

531613
schema_type = response.get('schemaType', 'AVRO')
532-
return RegisteredSchema(schema_id=response['id'],
533-
schema=Schema(response['schema'],
534-
schema_type,
535-
[
536-
SchemaReference(name=ref['name'],
537-
subject=ref['subject'],
538-
version=ref['version'])
539-
for ref in response.get('references', [])
540-
]),
541-
subject=response['subject'],
542-
version=response['version'])
614+
registered_schema = RegisteredSchema(
615+
schema_id=response['id'],
616+
schema=Schema(
617+
response['schema'],
618+
schema_type,
619+
[
620+
SchemaReference(
621+
name=ref['name'],
622+
subject=ref['subject'],
623+
version=ref['version']
624+
) for ref in response.get('references', [])
625+
]
626+
),
627+
subject=response['subject'],
628+
version=response['version']
629+
)
630+
self._metadata_cache.set(subject_name, None, version, registered_schema)
631+
632+
return registered_schema
543633

544634
def get_versions(self, subject_name):
545635
"""

tools/source-package-verification.sh

100644100755
File mode changed.

0 commit comments

Comments
 (0)