Skip to content

Commit bac9f62

Browse files
committed
PR Feedback, round 1
* Remove plugin registration and lookup * Remove interface in favor of callback
1 parent a5e8269 commit bac9f62

File tree

5 files changed

+53
-100
lines changed

5 files changed

+53
-100
lines changed

confluent_kafka/avro/__init__.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
33
44
"""
5-
import pkg_resources
6-
75
from confluent_kafka import Producer, Consumer
86
from confluent_kafka.avro.error import ClientError
97
from confluent_kafka.avro.load import load, loads # noqa
@@ -12,6 +10,7 @@
1210
KeySerializerError,
1311
ValueSerializerError)
1412
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
13+
from confluent_kafka.avro.serializer.name_strategies import topic_name_strategy
1514

1615

1716
class AvroProducer(Producer):
@@ -28,7 +27,8 @@ class AvroProducer(Producer):
2827
"""
2928

3029
def __init__(self, config, default_key_schema=None,
31-
default_value_schema=None, schema_registry=None):
30+
default_value_schema=None, schema_registry=None,
31+
value_subject_name_strategy=topic_name_strategy, key_subject_name_strategy=topic_name_strategy):
3232
schema_registry_url = config.pop("schema.registry.url", None)
3333
if schema_registry is None:
3434
if schema_registry_url is None:
@@ -38,27 +38,13 @@ def __init__(self, config, default_key_schema=None,
3838
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
3939

4040
self._serializer = MessageSerializer(schema_registry,
41-
self._load_name_strategy(config, "value.subject.name.strategy"),
42-
self._load_name_strategy(config, "key.subject.name.strategy"))
41+
key_subject_name_strategy=key_subject_name_strategy,
42+
value_subject_name_strategy=value_subject_name_strategy)
4343

4444
super(AvroProducer, self).__init__(config)
4545
self._key_schema = default_key_schema
4646
self._value_schema = default_value_schema
4747

48-
@staticmethod
49-
def _load_name_strategy(config, key):
50-
strategy_name = config.pop(key, "TopicName")
51-
subject_name_strategy_plugins = {
52-
entry_point.name: entry_point
53-
for entry_point
54-
in pkg_resources.iter_entry_points('subject.name.strategy')
55-
}
56-
subject_name_strategy = subject_name_strategy_plugins.get(strategy_name, None)
57-
if subject_name_strategy is None:
58-
raise ValueError("The subject name strategy plugin named {} could not be found.".format(
59-
strategy_name))
60-
return subject_name_strategy.load()()
61-
6248
def produce(self, **kwargs):
6349
"""
6450
Asynchronously sends message to Kafka by encoding with specified or default avro schema.

confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from confluent_kafka.avro.serializer import (SerializerError,
3333
KeySerializerError,
3434
ValueSerializerError)
35-
from confluent_kafka.avro.serializer.name_strategies import TopicNameStrategy
35+
from confluent_kafka.avro.serializer.name_strategies import topic_name_strategy
3636

3737
log = logging.getLogger(__name__)
3838

@@ -70,7 +70,7 @@ class MessageSerializer(object):
7070
"""
7171

7272
def __init__(self, registry_client,
73-
key_subject_name_strategy=TopicNameStrategy(), value_subject_name_strategy=TopicNameStrategy()):
73+
key_subject_name_strategy=topic_name_strategy, value_subject_name_strategy=topic_name_strategy):
7474
self.registry_client = registry_client
7575
self.key_subject_name_strategy = key_subject_name_strategy
7676
self.value_subject_name_strategy = value_subject_name_strategy
@@ -96,7 +96,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False):
9696
serialize_err = KeySerializerError if is_key else ValueSerializerError
9797

9898
name_strategy = self.key_subject_name_strategy if is_key else self.value_subject_name_strategy
99-
subject = name_strategy.get_subject_name(topic, is_key, schema)
99+
subject = name_strategy(topic, is_key, schema)
100100
# register it
101101
schema_id = self.registry_client.register(subject, schema)
102102
if not schema_id:

confluent_kafka/avro/serializer/name_strategies.py

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,52 +20,32 @@
2020
the strategies provided via the Java client (https://github.com/confluentinc/schema-registry).
2121
2222
The built in strategies include:
23-
``TopicName``
23+
``topic_name_strategy``
2424
The topic name is used along with either "key" or "value".
2525
26-
``RecordName``
26+
``record_name_strategy``
2727
The fully-qualified name of the avro schema is used.
2828
29-
``TopicRecordName``
29+
``topic_record_name_strategy``
3030
The topic name is used along with the fully-qualified name of the avro schema.
3131
32-
Additional strategies may be provided at run time by registering a class with the ``subject.name.strategy`` entry
33-
point.
32+
Additional strategies may be provided by passing a callable accepting topic, is_key, and schema as arguments.
3433
3534
"""
3635

3736

38-
class SubjectNameStrategy(object):
39-
"""
40-
A {@link SubjectNameStrategy} is used by the Avro serializer to determine
41-
the subject name under which the event record schemas should be registered
42-
in the schema registry. The default is {@link TopicNameStrategy}.
43-
"""
44-
def get_subject_name(self, topic, is_key, schema):
45-
"""
46-
For a given topic and message, returns the subject name under which the
47-
schema should be registered in the schema registry.
48-
:param topic: The Kafka topic name to which the message is being published.
49-
:param is_key: True when encoding a message key, false for a message value.
50-
:param schema: The value to be published in the message.
51-
:return: The subject name under which the schema should be registered.
52-
"""
53-
raise NotImplemented("SubjectNameStrategy implementations must implement get_subject_name.")
54-
55-
56-
class TopicNameStrategy(SubjectNameStrategy):
37+
def topic_name_strategy(topic, is_key, schema):
5738
"""
5839
Default {@link SubjectNameStrategy}: for any messages published to
5940
`topic`, the schema of the message key is registered under
6041
the subject name `topic`-key, and the message value is registered
6142
under the subject name `topic`-value.
6243
"""
63-
def get_subject_name(self, topic, is_key, schema):
64-
suffix = "-key" if is_key else "-value"
65-
return topic + suffix
44+
suffix = "-key" if is_key else "-value"
45+
return topic + suffix
6646

6747

68-
class RecordNameStrategy(SubjectNameStrategy):
48+
def record_name_strategy(topic, is_key, schema):
6949
"""
7050
For any Avro record type that is published to Kafka, registers the schema
7151
in the registry under the fully-qualified record name (regardless of the
@@ -74,11 +54,10 @@ class RecordNameStrategy(SubjectNameStrategy):
7454
Instead, checks compatibility of any occurrences of the same record name
7555
across `all` topics.
7656
"""
77-
def get_subject_name(self, topic, is_key, schema):
78-
return schema.fullname
57+
return schema.fullname
7958

8059

81-
class TopicRecordNameStrategy(RecordNameStrategy):
60+
def topic_record_name_strategy(topic, is_key, schema):
8261
"""
8362
For any Avro record type that is published to Kafka topic `topic`,
8463
registers the schema in the registry under the subject name
@@ -89,5 +68,4 @@ class TopicRecordNameStrategy(RecordNameStrategy):
8968
incompatible versions of the same record name, since the compatibility
9069
check is scoped to a particular record name within a particular topic.
9170
"""
92-
def get_subject_name(self, topic, is_key, schema):
93-
return topic + "-" + RecordNameStrategy.get_subject_name(self, topic, is_key, schema)
71+
return topic + "-" + record_name_strategy(topic, is_key, schema)

setup.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
'confluent_kafka/src/Admin.c'])
2424

2525

26+
install_requirements_path = os.environ.get('INSTALL_REQUIREMENTS_PATH_OVERRIDE',
27+
os.path.dirname(__file__))
28+
29+
2630
def get_install_requirements(path):
27-
content = open(os.path.join(os.path.dirname(__file__), path)).read()
31+
content = open(os.path.join(install_requirements_path, path)).read()
2832
return [
2933
req
3034
for req in content.split("\n")
@@ -45,11 +49,4 @@ def get_install_requirements(path):
4549
extras_require={
4650
'avro': ['fastavro', 'requests', avro],
4751
'dev': get_install_requirements("test-requirements.txt")
48-
},
49-
entry_points={
50-
'subject.name.strategy': [
51-
'TopicName = confluent_kafka.avro.serializer.name_strategies:TopicNameStrategy [avro]',
52-
'RecordName = confluent_kafka.avro.serializer.name_strategies:RecordNameStrategy [avro]',
53-
'TopicRecordName = confluent_kafka.avro.serializer.name_strategies:TopicRecordNameStrategy [avro]'
54-
]
5552
})

tests/avro/test_subject_name_strategies.py

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,77 +26,69 @@
2626

2727
class TestMessageSerializer(unittest.TestCase):
2828
def test_topic_name(self):
29-
strategy = strategies.TopicNameStrategy()
29+
strategy = strategies.topic_name_strategy
3030
topic = "some_legal_topic_name"
3131
expected = topic + "-key"
32-
actual = strategy.get_subject_name(topic, True, None)
32+
actual = strategy(topic, True, None)
3333
assert expected == actual
3434
expected = topic + "-value"
35-
actual = strategy.get_subject_name(topic, False, None)
35+
actual = strategy(topic, False, None)
3636
assert expected == actual
3737

3838
def test_record_name(self):
39-
strategy = strategies.RecordNameStrategy()
39+
strategy = strategies.record_name_strategy
4040
schema = RecordSchema("MyRecordType", "my.namespace", fields=[], names=Names())
4141
expected = "my.namespace.MyRecordType"
42-
actual = strategy.get_subject_name(None, None, schema)
42+
actual = strategy(None, None, schema)
4343
assert expected == actual
4444

4545
def test_topic_record_name(self):
46-
strategy = strategies.TopicRecordNameStrategy()
46+
strategy = strategies.topic_record_name_strategy
4747
topic = "some_legal_topic_name"
4848
schema = RecordSchema("MyRecordType", "my.namespace", fields=[], names=Names())
4949
expected = "some_legal_topic_name-my.namespace.MyRecordType"
50-
actual = strategy.get_subject_name(topic, None, schema)
50+
actual = strategy(topic, None, schema)
5151
assert expected == actual
5252

5353
def test_default_subject_name_strategy(self):
5454
schema_registry = MockSchemaRegistryClient()
5555
producer = AvroProducer(config={}, schema_registry=schema_registry)
5656
serializer = producer._serializer
57-
assert isinstance(serializer.key_subject_name_strategy, strategies.TopicNameStrategy)
58-
assert isinstance(serializer.value_subject_name_strategy, strategies.TopicNameStrategy)
57+
assert serializer.key_subject_name_strategy is strategies.topic_name_strategy
58+
assert serializer.value_subject_name_strategy is strategies.topic_name_strategy
5959

6060
def test_explicit_topic_subject_name_strategy(self):
6161
schema_registry = MockSchemaRegistryClient()
62-
config = {
63-
'value.subject.name.strategy': 'TopicName',
64-
'key.subject.name.strategy': 'TopicName'
65-
}
66-
producer = AvroProducer(config=config, schema_registry=schema_registry)
62+
producer = AvroProducer(config={}, schema_registry=schema_registry,
63+
key_subject_name_strategy=strategies.topic_name_strategy,
64+
value_subject_name_strategy=strategies.topic_name_strategy)
6765
serializer = producer._serializer
68-
assert isinstance(serializer.key_subject_name_strategy, strategies.TopicNameStrategy)
69-
assert isinstance(serializer.value_subject_name_strategy, strategies.TopicNameStrategy)
66+
assert serializer.key_subject_name_strategy is strategies.topic_name_strategy
67+
assert serializer.value_subject_name_strategy is strategies.topic_name_strategy
7068

7169
def test_explicit_record_subject_name_strategy(self):
7270
schema_registry = MockSchemaRegistryClient()
73-
config = {
74-
'value.subject.name.strategy': 'RecordName',
75-
'key.subject.name.strategy': 'RecordName'
76-
}
77-
producer = AvroProducer(config=config, schema_registry=schema_registry)
71+
producer = AvroProducer(config={}, schema_registry=schema_registry,
72+
key_subject_name_strategy=strategies.record_name_strategy,
73+
value_subject_name_strategy=strategies.record_name_strategy)
7874
serializer = producer._serializer
79-
assert isinstance(serializer.key_subject_name_strategy, strategies.RecordNameStrategy)
80-
assert isinstance(serializer.value_subject_name_strategy, strategies.RecordNameStrategy)
75+
assert serializer.key_subject_name_strategy is strategies.record_name_strategy
76+
assert serializer.value_subject_name_strategy is strategies.record_name_strategy
8177

8278
def test_explicit_topic_record_subject_name_strategy(self):
8379
schema_registry = MockSchemaRegistryClient()
84-
config = {
85-
'value.subject.name.strategy': 'TopicRecordName',
86-
'key.subject.name.strategy': 'TopicRecordName'
87-
}
88-
producer = AvroProducer(config=config, schema_registry=schema_registry)
80+
producer = AvroProducer(config={}, schema_registry=schema_registry,
81+
key_subject_name_strategy=strategies.topic_record_name_strategy,
82+
value_subject_name_strategy=strategies.topic_record_name_strategy)
8983
serializer = producer._serializer
90-
assert isinstance(serializer.key_subject_name_strategy, strategies.TopicRecordNameStrategy)
91-
assert isinstance(serializer.value_subject_name_strategy, strategies.TopicRecordNameStrategy)
84+
assert serializer.key_subject_name_strategy is strategies.topic_record_name_strategy
85+
assert serializer.value_subject_name_strategy is strategies.topic_record_name_strategy
9286

9387
def test_differing_key_and_value_subject_name_strategies(self):
9488
schema_registry = MockSchemaRegistryClient()
95-
config = {
96-
'value.subject.name.strategy': 'RecordName',
97-
'key.subject.name.strategy': 'TopicRecordName'
98-
}
99-
producer = AvroProducer(config=config, schema_registry=schema_registry)
89+
producer = AvroProducer(config={}, schema_registry=schema_registry,
90+
key_subject_name_strategy=strategies.record_name_strategy,
91+
value_subject_name_strategy=strategies.topic_record_name_strategy)
10092
serializer = producer._serializer
101-
assert isinstance(serializer.key_subject_name_strategy, strategies.RecordNameStrategy)
102-
assert isinstance(serializer.value_subject_name_strategy, strategies.TopicRecordNameStrategy)
93+
assert serializer.key_subject_name_strategy is strategies.record_name_strategy
94+
assert serializer.value_subject_name_strategy is strategies.topic_record_name_strategy

0 commit comments

Comments
 (0)