Skip to content

Mirror subject.name.strategy from Java client #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ dl-*
.pytest_cache
staging
tests/docker/conf/tls/*
librdkafka-tmp
9 changes: 6 additions & 3 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas

"""

from confluent_kafka import Producer, Consumer
from confluent_kafka.avro.error import ClientError
from confluent_kafka.avro.load import load, loads # noqa
Expand All @@ -11,6 +10,7 @@
KeySerializerError,
ValueSerializerError)
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.serializer.name_strategies import topic_name_strategy


class AvroProducer(Producer):
Expand All @@ -27,7 +27,8 @@ class AvroProducer(Producer):
"""

def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):
default_value_schema=None, schema_registry=None,
value_subject_name_strategy=topic_name_strategy, key_subject_name_strategy=topic_name_strategy):

sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}
Expand All @@ -46,7 +47,9 @@ def __init__(self, config, default_key_schema=None,
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroProducer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry)
self._serializer = MessageSerializer(schema_registry,
key_subject_name_strategy=key_subject_name_strategy,
value_subject_name_strategy=value_subject_name_strategy)
self._key_schema = default_key_schema
self._value_schema = default_value_schema

Expand Down
13 changes: 8 additions & 5 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from confluent_kafka.avro.serializer import (SerializerError,
KeySerializerError,
ValueSerializerError)
from confluent_kafka.avro.serializer.name_strategies import topic_name_strategy

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,8 +69,11 @@ class MessageSerializer(object):
All decode_* methods expect a buffer received from kafka.
"""

def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=None):
def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=None,
key_subject_name_strategy=topic_name_strategy, value_subject_name_strategy=topic_name_strategy):
self.registry_client = registry_client
self.key_subject_name_strategy = key_subject_name_strategy
self.value_subject_name_strategy = value_subject_name_strategy
self.id_to_decoder_func = {}
self.id_to_writers = {}
self.reader_key_schema = reader_key_schema
Expand All @@ -88,7 +92,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
Given a parsed avro schema, encode a record for the given topic. The
record is expected to be a dictionary.

The schema is registered with the subject of 'topic-value'
The schema is registered with the subject name as returned by the provided subject_name_strategy
:param str topic: Topic name
:param schema schema: Avro Schema
:param dict record: An object to serialize
Expand All @@ -98,9 +102,8 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
"""
serialize_err = KeySerializerError if is_key else ValueSerializerError

subject_suffix = ('-key' if is_key else '-value')
# get the latest schema for the subject
subject = topic + subject_suffix
name_strategy = self.key_subject_name_strategy if is_key else self.value_subject_name_strategy
subject = name_strategy(topic, is_key, schema)
# register it
schema_id = self.registry_client.register(subject, schema)
if not schema_id:
Expand Down
74 changes: 74 additions & 0 deletions confluent_kafka/avro/serializer/name_strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
A set of common Subject naming strategies for use with the Confluent Schema Registry. The built-in strategies mirror
the strategies provided via the Java client (https://github.com/confluentinc/schema-registry).

The built in strategies include:
``topic_name_strategy``
The topic name is used along with either "key" or "value".

``record_name_strategy``
The fully-qualified name of the avro schema is used.

``topic_record_name_strategy``
The topic name is used along with the fully-qualified name of the avro schema.

Additional strategies may be provided by passing a callable accepting topic, is_key, and schema as arguments.

"""


def topic_name_strategy(topic, is_key, schema):
"""
Default {@link SubjectNameStrategy}: for any messages published to
`topic`, the schema of the message key is registered under
the subject name `topic`-key, and the message value is registered
under the subject name `topic`-value.
"""
suffix = "-key" if is_key else "-value"
return topic + suffix


def record_name_strategy(topic, is_key, schema):
"""
For any Avro record type that is published to Kafka, registers the schema
in the registry under the fully-qualified record name (regardless of the
topic). This strategy allows a topic to contain a mixture of different
record types, since no intra-topic compatibility checking is performed.
Instead, checks compatibility of any occurrences of the same record name
across `all` topics.
"""
return schema.fullname if hasattr(schema, 'fullname') \
else schema.name if hasattr(schema, 'name') \
else schema.type if schema is not None \
else 'null'


def topic_record_name_strategy(topic, is_key, schema):
"""
For any Avro record type that is published to Kafka topic `topic`,
registers the schema in the registry under the subject name
`topic`-`recordName`, where `recordName` is the
fully-qualified Avro record name. This strategy allows a topic to contain
a mixture of different record types, since no intra-topic compatibility
checking is performed. Moreover, different topics may contain mutually
incompatible versions of the same record name, since the compatibility
check is scoped to a particular record name within a particular topic.
"""
return topic + "-" + record_name_strategy(topic, is_key, schema)
2 changes: 1 addition & 1 deletion tests/avro/mock_schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MockSchemaRegistryClient(object):
"""
A client that acts as a schema registry locally.

Compatibiity related methods are not implemented at this time.
Compatibility related methods are not implemented at this time.
"""

def __init__(self, max_schemas_per_subject=1000):
Expand Down
124 changes: 124 additions & 0 deletions tests/avro/test_subject_name_strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

from avro.schema import Names, RecordSchema, PrimitiveSchema

import confluent_kafka.avro.serializer.name_strategies as strategies
from confluent_kafka.avro import AvroProducer
from tests.avro.mock_schema_registry_client import MockSchemaRegistryClient


class TestMessageSerializer(unittest.TestCase):
def test_topic_name(self):
strategy = strategies.topic_name_strategy
topic = "some_legal_topic_name"
expected = topic + "-key"
actual = strategy(topic, True, None)
assert expected == actual
expected = topic + "-value"
actual = strategy(topic, False, None)
assert expected == actual

def test_record_name(self):
strategy = strategies.record_name_strategy
schema = RecordSchema("MyRecordType", "my.namespace", fields=[], names=Names())
expected = "my.namespace.MyRecordType"
actual = strategy(None, None, schema)
assert expected == actual

def test_record_name_primitive_schema(self):
strategy = strategies.record_name_strategy
schema = PrimitiveSchema(type='string')
expected = 'string'
actual = strategy(None, None, schema)
assert expected == actual

def test_record_name_none_schema(self):
strategy = strategies.record_name_strategy
schema = None
expected = 'null'
actual = strategy(None, None, schema)
assert expected == actual

def test_topic_record_name(self):
strategy = strategies.topic_record_name_strategy
topic = "some_legal_topic_name"
schema = RecordSchema("MyRecordType", "my.namespace", fields=[], names=Names())
expected = "some_legal_topic_name-my.namespace.MyRecordType"
actual = strategy(topic, None, schema)
assert expected == actual

def test_topic_record_name_primitive_schema(self):
strategy = strategies.topic_record_name_strategy
topic = "some_legal_topic_name"
schema = PrimitiveSchema(type='string')
expected = 'some_legal_topic_name-string'
actual = strategy(topic, None, schema)
assert expected == actual

def test_topic_record_name_none_schema(self):
strategy = strategies.topic_record_name_strategy
topic = "some_legal_topic_name"
schema = None
expected = 'some_legal_topic_name-null'
actual = strategy(topic, None, schema)
assert expected == actual

def test_default_subject_name_strategy(self):
schema_registry = MockSchemaRegistryClient()
producer = AvroProducer(config={}, schema_registry=schema_registry)
serializer = producer._serializer
assert serializer.key_subject_name_strategy is strategies.topic_name_strategy
assert serializer.value_subject_name_strategy is strategies.topic_name_strategy

def test_explicit_topic_subject_name_strategy(self):
schema_registry = MockSchemaRegistryClient()
producer = AvroProducer(config={}, schema_registry=schema_registry,
key_subject_name_strategy=strategies.topic_name_strategy,
value_subject_name_strategy=strategies.topic_name_strategy)
serializer = producer._serializer
assert serializer.key_subject_name_strategy is strategies.topic_name_strategy
assert serializer.value_subject_name_strategy is strategies.topic_name_strategy

def test_explicit_record_subject_name_strategy(self):
schema_registry = MockSchemaRegistryClient()
producer = AvroProducer(config={}, schema_registry=schema_registry,
key_subject_name_strategy=strategies.record_name_strategy,
value_subject_name_strategy=strategies.record_name_strategy)
serializer = producer._serializer
assert serializer.key_subject_name_strategy is strategies.record_name_strategy
assert serializer.value_subject_name_strategy is strategies.record_name_strategy

def test_explicit_topic_record_subject_name_strategy(self):
schema_registry = MockSchemaRegistryClient()
producer = AvroProducer(config={}, schema_registry=schema_registry,
key_subject_name_strategy=strategies.topic_record_name_strategy,
value_subject_name_strategy=strategies.topic_record_name_strategy)
serializer = producer._serializer
assert serializer.key_subject_name_strategy is strategies.topic_record_name_strategy
assert serializer.value_subject_name_strategy is strategies.topic_record_name_strategy

def test_differing_key_and_value_subject_name_strategies(self):
schema_registry = MockSchemaRegistryClient()
producer = AvroProducer(config={}, schema_registry=schema_registry,
key_subject_name_strategy=strategies.record_name_strategy,
value_subject_name_strategy=strategies.topic_record_name_strategy)
serializer = producer._serializer
assert serializer.key_subject_name_strategy is strategies.record_name_strategy
assert serializer.value_subject_name_strategy is strategies.topic_record_name_strategy
Loading