Skip to content

Add disable auto register schemas #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, config, default_key_schema=None,
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')
sr_conf['auto.register.schemas'] = config.get('auto.register.schemas', True)

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}
Expand Down
40 changes: 40 additions & 0 deletions confluent_kafka/avro/cached_schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def __init__(self, url, max_schemas_per_subject=1000, ca_location=None, cert_loc

self._session = s

self.auto_register_schemas = conf.pop("auto.register.schemas", True)

if len(conf) > 0:
raise ValueError("Unrecognized configuration properties: {}".format(conf.keys()))

Expand Down Expand Up @@ -230,6 +232,44 @@ def register(self, subject, avro_schema):
self._cache_schema(avro_schema, schema_id, subject)
return schema_id

def check_registration(self, subject, avro_schema):
"""
POST /subjects/(string: subject)
Check if a schema has already been registered under the specified subject.
If so, returns the schema id. Otherwise, raises a ClientError.

avro_schema must be a parsed schema from the python avro library

Multiple instances of the same schema will result in inconsistencies.

:param str subject: subject name
:param schema avro_schema: Avro schema to be checked
:returns: schema_id
:rtype: int
"""

schemas_to_id = self.subject_to_schema_ids[subject]
schema_id = schemas_to_id.get(avro_schema, None)
if schema_id is not None:
return schema_id
# send it up
url = '/'.join([self.url, 'subjects', subject])
# body is { schema : json_string }

body = {'schema': json.dumps(avro_schema.to_json())}
result, code = self._send_request(url, method='POST', body=body)
if code == 401 or code == 403:
raise ClientError("Unauthorized access. Error code:" + str(code))
elif code == 404:
raise ClientError("Schema or subject not found:" + str(code))
elif not 200 <= code <= 299:
raise ClientError("Unable to check schema registration. Error code:" + str(code))
# result is a dict
schema_id = result['id']
# cache it
self._cache_schema(avro_schema, schema_id, subject)
return schema_id

def delete_subject(self, subject):
"""
DELETE /subjects/(string: subject)
Expand Down
7 changes: 5 additions & 2 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
subject_suffix = ('-key' if is_key else '-value')
# get the latest schema for the subject
subject = topic + subject_suffix
# register it
schema_id = self.registry_client.register(subject, schema)
if self.registry_client.auto_register_schemas:
# register it
schema_id = self.registry_client.register(subject, schema)
else:
schema_id = self.registry_client.check_registration(subject, schema)
if not schema_id:
message = "Unable to retrieve schema id for subject %s" % (subject)
raise serialize_err(message)
Expand Down
6 changes: 6 additions & 0 deletions tests/avro/mock_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ def register(self, req, groups):
schema_id = self.registry.register(subject, avro_schema)
return (200, {'id': schema_id})

def check_registration(self, req, groups):
avro_schema = self._get_schema_from_body(req)
subject = groups[0]
schema_id = self.registry.check_registration(subject, avro_schema)
return (200, {'id': schema_id})

def get_version(self, req, groups):
avro_schema = self._get_schema_from_body(req)
if not avro_schema:
Expand Down
22 changes: 22 additions & 0 deletions tests/avro/mock_schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(self, max_schemas_per_subject=1000):
self.next_id = 1
self.schema_to_id = {}

self.auto_register_schemas = True

def _get_next_id(self, schema):
if schema in self.schema_to_id:
return self.schema_to_id[schema]
Expand Down Expand Up @@ -109,6 +111,26 @@ def register(self, subject, avro_schema):
self._cache_schema(avro_schema, schema_id, subject, version)
return schema_id

def check_registration(self, subject, avro_schema):
"""
Check if a schema has already been registered under the specified subject.
If so, returns the schema id. Otherwise, raises a ClientError.

avro_schema must be a parsed schema from the python avro library

Multiple instances of the same schema will result in inconsistencies.
"""
schemas_to_id = self.subject_to_schema_ids.get(subject, {})
schema_id = schemas_to_id.get(avro_schema, -1)
if schema_id != -1:
return schema_id

version = self._get_next_version(subject) - 1

# cache it
self._cache_schema(avro_schema, schema_id, subject, version)
return schema_id

def get_by_id(self, schema_id):
"""Retrieve a parsed avro schema by id or None if not found"""
return self.id_to_schema.get(schema_id, None)
Expand Down
6 changes: 6 additions & 0 deletions tests/avro/test_cached_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ def test_register(self):
self.assertTrue(schema_id > 0)
self.assertEqual(len(client.id_to_schema), 1)

def test_check_registration(self):
parsed = avro.loads(data_gen.BASIC_SCHEMA)
client = self.client
schema_id = client.register('test', parsed)
self.assertEqual(schema_id, client.check_registration('test', parsed))

def test_multi_subject_register(self):
parsed = avro.loads(data_gen.BASIC_SCHEMA)
client = self.client
Expand Down
1 change: 1 addition & 0 deletions tests/integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def verify_schema_registry_client():
schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))

schema_id = sr.register(subject, schema)
assert schema_id == sr.check_registration(subject, schema)
assert schema == sr.get_by_id(schema_id)
latest_id, latest_schema, latest_version = sr.get_latest_schema(subject)
assert schema == latest_schema
Expand Down