Skip to content

Commit

Permalink
Add support for arbitrary deserializers
Browse files Browse the repository at this point in the history
  • Loading branch information
csaroff committed Sep 30, 2022
1 parent 99e082f commit 937b256
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
36 changes: 32 additions & 4 deletions rele/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,22 @@ def callback_func(self, data, **kwargs):
"""

def __init__(
self, func, topic, prefix="", suffix="", filter_by=None, backend_filter_by=None
self,
func,
topic,
prefix="",
suffix="",
filter_by=None,
backend_filter_by=None,
deserialize=lambda x: json.loads(x.data.decode("utf-8")),
):
self._func = func
self.topic = topic
self._prefix = prefix
self._suffix = suffix
self._filters = self._init_filters(filter_by)
self.backend_filter_by = backend_filter_by
self._deserialize = self._init_deserialize(deserialize)

def _init_filters(self, filter_by):
if filter_by and not (
Expand All @@ -61,6 +69,11 @@ def _init_filters(self, filter_by):

return None

def _init_deserialize(self, deserialize):
if not callable(deserialize):
raise ValueError("deserialize must be a callable or None.")
return deserialize

@property
def name(self):
name_parts = [self._prefix, self.topic, self._suffix]
Expand Down Expand Up @@ -109,8 +122,8 @@ def __call__(self, message):
start_time = time.time()

try:
data = json.loads(message.data.decode("utf-8"))
except json.JSONDecodeError as e:
data = self._subscription._deserialize(message)
except Exception as e:
message.ack()
run_middleware_hook(
"post_process_message_failure",
Expand Down Expand Up @@ -145,7 +158,14 @@ def __call__(self, message):
run_middleware_hook("post_process_message")


def sub(topic, prefix=None, suffix=None, filter_by=None, backend_filter_by=None):
def sub(
topic,
prefix=None,
suffix=None,
filter_by=None,
backend_filter_by=None,
deserialize=lambda x: json.loads(x.data.decode("utf-8")),
):
"""Decorator function that makes declaring a PubSub Subscription simple.
The Subscriber returned will automatically create and name
Expand Down Expand Up @@ -181,6 +201,11 @@ def purpose_2(data, **kwargs):
def sub_process_landscape_photos(data, **kwargs):
pass
@sub(topic='string-messages', deserialize=lambda x: x.data.decode('utf-8'))
def sub_process_non_json(data, **kwargs):
pass
:param topic: string The topic that is being subscribed to.
:param prefix: string An optional prefix to the subscription name.
Useful to namespace your subscription with your project name
Expand All @@ -190,6 +215,8 @@ def sub_process_landscape_photos(data, **kwargs):
:param filter_by: Union[function, list] An optional function or tuple of
functions that filters the messages to be processed by
the sub regarding their attributes.
:param deserialize: function An optional deserialization function that
replaces the default json deserialization.
:return: :class:`~rele.subscription.Subscription`
"""

Expand All @@ -214,6 +241,7 @@ def decorator(func):
suffix=suffix,
filter_by=filter_by,
backend_filter_by=backend_filter_by,
deserialize=deserialize,
)

return decorator
15 changes: 15 additions & 0 deletions tests/test_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def sub_process_landscape_gif_photos(data, **kwargs):
return f'Received a {kwargs.get("format")} photo of type {kwargs.get("type")}'


def deserialize_handler(message):
return message.data.decode("utf-8")


@sub(topic="string-data", deserialize=deserialize_handler)
def sub_string_deserializer(data, **kwargs):
return data


class TestSubscription:
def test_subs_return_subscription_objects(self):
assert isinstance(sub_stub, Subscription)
Expand Down Expand Up @@ -294,6 +303,12 @@ def crashy_sub_stub(data, **kwargs):
}
assert failed_log.subscription_message == message_wrapper

def test_deserialize_handler(self, message_wrapper_invalid_json):
callback = Callback(sub_string_deserializer)
res = callback(message_wrapper_invalid_json)

assert res == "foobar"

def test_log_acks_called_message_when_not_json_serializable(
self, caplog, message_wrapper_invalid_json, published_at
):
Expand Down

0 comments on commit 937b256

Please sign in to comment.