Skip to content
This repository has been archived by the owner on Nov 23, 2020. It is now read-only.

Commit

Permalink
allow for clean removal of callbacks #release-note
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Nov 1, 2016
1 parent b878617 commit d7b9ff1
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions pulsar/apps/data/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .store import PubSubClient


regex_callbacks = namedtuple('regex_callbacks', 'regex callbacks')
regex_callbacks = namedtuple('regex_callbacks', 'code regex callbacks')


LOGGER = logging.getLogger('pulsar.channels')
Expand All @@ -38,16 +38,23 @@ def backoff(value):
return min(value + 0.25, 16)


class CallbackError(Exception):
"""Exception which allow for a clean callback removal
"""


class Json:

def encode(self, msg):
return json.dumps(msg)

def decode(self, msg):
if isinstance(msg, bytes):
msg = msg.decode('utf-8')
try:
return json.loads(msg.decode('utf-8'))
except Exception as exc:
raise ProtocolError('Invalid JSON') from exc
return json.loads(msg)
except Exception:
raise ProtocolError('Invalid JSON') from None


class Connector:
Expand Down Expand Up @@ -289,14 +296,17 @@ def __iter__(self):
def __call__(self, message):
event = message.pop('event', '')
data = message.get('data')
for regex, callbacks in self.callbacks.values():
match = regex.match(event)
for entry in tuple(self.callbacks.values()):
match = entry.regex.match(event)
if match:
match = match.group()
for callback in callbacks:
for callback in tuple(entry.callbacks):
try:
callback(self, match, data)
except CallbackError:
self._remove_callback(entry, callback)
except Exception:
self._remove_callback(entry, callback)
self.channels.logger.exception(
'callback exception: channel "%s" event "%s"',
self.name, event)
Expand All @@ -321,8 +331,8 @@ def register(self, event, callback):
regex = redis_to_py_pattern(event)
entry = self.callbacks.get(regex)
if not entry:
entry = regex_callbacks(re.compile(regex), [])
self.callbacks[regex] = entry
entry = regex_callbacks(regex, re.compile(regex), [])
self.callbacks[entry.code] = entry

if callback not in entry.callbacks:
entry.callbacks.append(callback)
Expand All @@ -333,7 +343,10 @@ def unregister(self, event, callback):
regex = redis_to_py_pattern(event)
entry = self.callbacks.get(regex)
if entry:
if callback in entry.callbacks:
entry.callbacks.remove(callback)
if not entry.callbacks:
self.callbacks.pop(regex)
self._remove_callback(entry, callback)

def _remove_callback(self, entry, callback):
if callback in entry.callbacks:
entry.callbacks.remove(callback)
if not entry.callbacks:
self.callbacks.pop(entry.code)

0 comments on commit d7b9ff1

Please sign in to comment.