Skip to content

Commit 40fe906

Browse files
committed
Place SerializationContext first in Serializer/SubjectNameStrategy APIs
1 parent b343c26 commit 40fe906

File tree

12 files changed

+106
-90
lines changed

12 files changed

+106
-90
lines changed

confluent_kafka/deserializing_consumer.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class DeserializingConsumer(_ConsumerImpl):
3030
capabilities.
3131
3232
Note:
33+
3334
The DeserializingConsumer is an experimental API and subject to change.
3435
3536
.. versionadded:: 1.4.0
@@ -38,6 +39,7 @@ class DeserializingConsumer(_ConsumerImpl):
3839
DeserializingConsumer on how to convert the message payload bytes to objects.
3940
4041
Note:
42+
4143
All configured callbacks are served from the application queue upon
4244
calling :py:func:`DeserializingConsumer.poll`
4345
@@ -52,11 +54,11 @@ class DeserializingConsumer(_ConsumerImpl):
5254
| group.id* | str | All clients sharing the same group.id belong to the |
5355
| | | same group. |
5456
+--------------------+-----------------+-----------------------------------------------------+
55-
| | | Callable(bytes, SerializationContext) -> obj |
57+
| | | Callable(SerializationContext, bytes) -> obj |
5658
| key.deserializer | callable | |
5759
| | | Serializer used for message keys. |
5860
+--------------------+-----------------+-----------------------------------------------------+
59-
| | | Callable(bytes, SerializationContext) -> obj |
61+
| | | Callable(SerializationContext, bytes) -> obj |
6062
| value.deserializer | callable | |
6163
| | | Serializer used for message values. |
6264
+--------------------+-----------------+-----------------------------------------------------+
@@ -72,15 +74,16 @@ class DeserializingConsumer(_ConsumerImpl):
7274
| | | |
7375
| | | Callback for statistics. This callback is |
7476
| stats_cb | callable | added to the application queue every |
75-
| | |``statistics.interval.ms`` (configured separately). |
77+
| | | ``statistics.interval.ms`` (configured separately). |
7678
| | | The function argument is a JSON formatted str |
7779
| | | containing statistics data. |
7880
+--------------------+-----------------+-----------------------------------------------------+
7981
| | | Callable(ThrottleEvent) |
8082
| throttle_cb | callable | |
8183
| | | Callback for throttled request reporting. |
8284
+--------------------+-----------------+-----------------------------------------------------+
83-
.. _See Client CONFIGURATION.md for a complete list of configuration properties.
85+
86+
.. _See Client CONFIGURATION.md for a complete list of configuration properties:
8487
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
8588
8689
Args:
@@ -127,7 +130,7 @@ def poll(self, timeout=-1):
127130
value = msg.value()
128131
if self._value_deserializer is not None:
129132
try:
130-
value = self._value_deserializer(value, ctx)
133+
value = self._value_deserializer(ctx, value)
131134
except SerializationError as se:
132135
raise ConsumeError(KafkaError._VALUE_DESERIALIZATION,
133136
reason=se.message,
@@ -136,7 +139,7 @@ def poll(self, timeout=-1):
136139
if self._key_deserializer is not None:
137140
try:
138141
ctx.field = MessageField.KEY
139-
key = self._key_deserializer(key, ctx)
142+
key = self._key_deserializer(ctx, key)
140143
except SerializationError as se:
141144
raise ConsumeError(KafkaError._KEY_DESERIALIZATION,
142145
reason=se.message,

confluent_kafka/error.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,8 @@ def __init__(self, error, reason=None, message=None):
4242
self.message = message
4343

4444
def __repr__(self):
45-
return '{klass}(error={error})'.format(
46-
klass=self.__class__.__name__,
47-
error=self.reason
48-
)
45+
return str(self)
4946

5047
def __str__(self):
51-
return self.reason
48+
return "{} (Error code {})".format(self.reason,
49+
self.error_code)

confluent_kafka/schema_registry/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"record_subject_name_strategy"]
3434

3535

36-
def topic_subject_name_strategy(record_name, ctx):
36+
def topic_subject_name_strategy(ctx, record_name):
3737
"""
3838
Constructs a subject name in the form of {topic}-key|value.
3939
@@ -47,7 +47,7 @@ def topic_subject_name_strategy(record_name, ctx):
4747
return ctx.topic + "-" + ctx.field
4848

4949

50-
def topic_record_subject_name_strategy(record_name, ctx):
50+
def topic_record_subject_name_strategy(ctx, record_name):
5151
"""
5252
Constructs a subject name in the form of {topic}-{record_name}.
5353
@@ -61,7 +61,7 @@ def topic_record_subject_name_strategy(record_name, ctx):
6161
return ctx.topic + "-" + record_name
6262

6363

64-
def record_subject_name_strategy(record_name, ctx):
64+
def record_subject_name_strategy(ctx, record_name):
6565
"""
6666
Constructs a subject name in the form of {record_name}.
6767
@@ -75,7 +75,7 @@ def record_subject_name_strategy(record_name, ctx):
7575
return record_name
7676

7777

78-
def reference_subject_name_strategy(schema_ref, ctx):
78+
def reference_subject_name_strategy(ctx, schema_ref):
7979
"""
8080
Constructs a subject reference name in the form of {reference name}.
8181

confluent_kafka/schema_registry/avro.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class AvroSerializer(Serializer):
125125
126126
schema_str (str): Avro Schema declaration.
127127
128-
to_dict (callable, optional): Callable(object, SerializationContext) -> dict.
128+
to_dict (callable, optional): Callable(SerializationContext, object) -> dict.
129129
Converts object to a dict.
130130
131131
conf (dict): AvroSerializer configuration.
@@ -189,16 +189,18 @@ def __init__(self, schema_registry_client, schema_str,
189189
self._schema_name = schema_name
190190
self._parsed_schema = parsed_schema
191191

192-
def __call__(self, obj, ctx):
192+
def __call__(self, ctx, obj):
193193
"""
194194
Serializes an object to the Confluent Schema Registry's Avro binary
195195
format.
196196
197197
Args:
198-
obj (object): object instance to serializes.
199198
ctx (SerializationContext): Metadata pertaining to the serialization
200199
operation.
201200
201+
obj (object): object instance to serializes.
202+
203+
202204
Note:
203205
None objects are represented as Kafka Null.
204206
@@ -212,7 +214,7 @@ def __call__(self, obj, ctx):
212214
if obj is None:
213215
return None
214216

215-
subject = self._subject_name_func(self._schema_name, ctx)
217+
subject = self._subject_name_func(ctx, self._schema_name)
216218

217219
# Check to ensure this schema has been registered under subject_name.
218220
if self._auto_register and subject not in self._known_subjects:
@@ -229,7 +231,7 @@ def __call__(self, obj, ctx):
229231
self._known_subjects.add(subject)
230232

231233
if self.to_dict is not None:
232-
value = self.to_dict(obj, ctx)
234+
value = self.to_dict(ctx, obj)
233235
else:
234236
value = obj
235237

@@ -261,7 +263,7 @@ class AvroDeserializer(Deserializer):
261263
262264
schema_str (str): Avro reader schema declaration.
263265
264-
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
266+
from_dict (callable, optional): Callable(SerializationContext, dict) -> object.
265267
Converts dict to an instance of some object.
266268
267269
.. _Schema declaration:
@@ -281,20 +283,20 @@ def __init__(self, schema_registry_client, schema_str, from_dict=None):
281283

282284
if from_dict is not None and not callable(from_dict):
283285
raise ValueError("from_dict must be callable with the signature"
284-
" from_dict(dict, SerializationContext) -> object")
286+
" from_dict(SerializationContext, dict) -> object")
285287
self.from_dict = from_dict
286288

287-
def __call__(self, value, ctx):
289+
def __call__(self, ctx, value):
288290
"""
289291
Decodes a Confluent Schema Registry formatted Avro bytes
290292
to an object.
291293
292294
Arguments:
293-
value (bytes): bytes
294-
295295
ctx (SerializationContext): Metadata pertaining to the serialization
296296
operation.
297297
298+
value (bytes): bytes
299+
298300
Raises:
299301
SerializerError if an error occurs ready data.
300302
@@ -332,6 +334,6 @@ def __call__(self, value, ctx):
332334
self._reader_schema)
333335

334336
if self.from_dict is not None:
335-
return self.from_dict(obj_dict, ctx)
337+
return self.from_dict(ctx, obj_dict)
336338

337339
return obj_dict

confluent_kafka/schema_registry/error.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def __init__(self, http_status_code, error_code, error_message):
4242
self.error_message = error_message
4343

4444
def __repr__(self):
45-
return self.__str__()
45+
return str(self)
4646

4747
def __str__(self):
4848
return "{} (HTTP status code {}, SR code {})".format(self.error_message,

0 commit comments

Comments
 (0)