Skip to content

Commit 7650646

Browse files
author
Sebastian Molenda
authored
Event engine - Effect dispatcher (#160)
* Event Dispatcher
1 parent 484df21 commit 7650646

File tree

9 files changed

+228
-69
lines changed

9 files changed

+228
-69
lines changed

pubnub/event_engine/__init__.py

Whitespace-only changes.

pubnub/event_engine/dispatcher.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from pubnub.event_engine import effects
2+
3+
4+
class Dispatcher:
5+
def __init__(self) -> None:
6+
self._managed_effects = {}
7+
self._effect_emitter = effects.EmitEffect()
8+
9+
def dispatch_effect(self, effect: effects.PNEffect):
10+
if isinstance(effect, effects.PNEmittableEffect):
11+
self._effect_emitter.emit(effect)
12+
13+
if isinstance(effect, effects.PNManageableEffect):
14+
managed_effect = effects.ManagedEffect(effect)
15+
managed_effect.run()
16+
self._managed_effects[effect.__class__.__name__] = managed_effect
17+
18+
if isinstance(effect, effects.PNCancelEffect):
19+
if effect.cancel_effect in self._managed_effects:
20+
self._managed_effects[effect.cancel_effect].stop()
21+
del self._managed_effects[effect.cancel_effect]

pubnub/event_engine/effects.py

Lines changed: 91 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,38 @@
1-
from typing import Union
1+
from typing import List, Union
22
from pubnub.exceptions import PubNubException
33
from pubnub.enums import PNStatusCategory
4+
from pubnub.pubnub import PubNub
45

56

67
class PNEffect:
78
pass
89

910

10-
class HandshakeEffect(PNEffect):
11-
def __init__(self, channels: Union[None, list[str]], groups: Union[None, list[str]]) -> None:
11+
class PNManageableEffect(PNEffect):
12+
pass
13+
14+
15+
class PNCancelEffect(PNEffect):
16+
cancel_effect: str
17+
18+
19+
class HandshakeEffect(PNManageableEffect):
20+
def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None) -> None:
1221
super().__init__()
1322
self.channels = channels
1423
self.groups = groups
1524

1625

17-
class CancelHandshakeEffect(PNEffect):
18-
pass
26+
class CancelHandshakeEffect(PNCancelEffect):
27+
cancel_effect = HandshakeEffect.__name__
1928

2029

21-
class ReceiveMessagesEffect(PNEffect):
30+
class ReceiveMessagesEffect(PNManageableEffect):
2231
def __init__(self,
23-
channels: Union[None, list[str]],
24-
groups: Union[None, list[str]],
25-
timetoken: Union[None, str],
26-
region: Union[None, int]
32+
channels: Union[None, List[str]] = None,
33+
groups: Union[None, List[str]] = None,
34+
timetoken: Union[None, str] = None,
35+
region: Union[None, int] = None
2736
) -> None:
2837
super().__init__()
2938
self.channels = channels
@@ -32,51 +41,95 @@ def __init__(self,
3241
self.region = region
3342

3443

35-
class CancelReceiveMessagesEffect(PNEffect):
36-
pass
37-
38-
39-
class EmitMessagesEffect(PNEffect):
40-
def __init__(self, messages: Union[None, list[str]]) -> None:
41-
super().__init__()
42-
self.messages = messages
43-
44-
45-
class EmitStatusEffect(PNEffect):
46-
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
47-
super().__init__()
48-
self.status = status
44+
class CancelReceiveMessagesEffect(PNCancelEffect):
45+
cancel_effect = ReceiveMessagesEffect.__name__
4946

5047

51-
class HandshakeReconnectEffect(PNEffect):
48+
class HandshakeReconnectEffect(PNManageableEffect):
5249
def __init__(self,
53-
channels: Union[None, list[str]],
54-
groups: Union[None, list[str]],
55-
attempts: Union[None, int],
56-
reason: Union[None, PubNubException]
50+
channels: Union[None, List[str]] = None,
51+
groups: Union[None, List[str]] = None,
52+
attempts: Union[None, int] = None,
53+
reason: Union[None, PubNubException] = None
5754
) -> None:
5855
self.channels = channels
5956
self.groups = groups
6057
self.attempts = attempts
6158
self.reason = reason
6259

6360

64-
class CancelHandshakeEffect(PNEffect):
65-
pass
61+
class CancelHandshakeReconnectEffect(PNCancelEffect):
62+
cancel_effect = HandshakeReconnectEffect.__name__
6663

6764

68-
class ReceiveReconnectEffect(PNEffect):
65+
class ReceiveReconnectEffect(PNManageableEffect):
6966
def __init__(self,
70-
channels: Union[None, list[str]],
71-
groups: Union[None, list[str]],
72-
timetoken: Union[None, str],
73-
region: Union[None, int],
74-
attempts: Union[None, int],
75-
reason: Union[None, PubNubException]
67+
channels: Union[None, List[str]] = None,
68+
groups: Union[None, List[str]] = None,
69+
timetoken: Union[None, str] = None,
70+
region: Union[None, int] = None,
71+
attempts: Union[None, int] = None,
72+
reason: Union[None, PubNubException] = None
7673
) -> None:
7774
self.channels = channels
7875
self.groups = groups
7976
self.timetoken = timetoken
8077
self.region = region
8178
self.attempts = attempts
8279
self.reason = reason
80+
81+
82+
class CancelReceiveReconnectEffect(PNCancelEffect):
83+
cancel_effect = ReceiveReconnectEffect.__name__
84+
85+
86+
class PNEmittableEffect(PNEffect):
87+
pass
88+
89+
90+
class EmitMessagesEffect(PNEmittableEffect):
91+
def __init__(self, messages: Union[None, List[str]]) -> None:
92+
super().__init__()
93+
self.messages = messages
94+
95+
96+
class EmitStatusEffect(PNEmittableEffect):
97+
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
98+
super().__init__()
99+
self.status = status
100+
101+
102+
class ManagedEffect:
103+
pubnub: PubNub
104+
effect: Union[PNManageableEffect, PNCancelEffect]
105+
106+
def set_pn(pubnub: PubNub):
107+
pubnub = pubnub
108+
109+
def __init__(self, effect: Union[PNManageableEffect, PNCancelEffect]) -> None:
110+
self.effect = effect
111+
112+
def run(self):
113+
pass
114+
115+
def stop(self):
116+
pass
117+
118+
119+
class EmitEffect:
120+
pubnub: PubNub
121+
122+
def set_pn(pubnub: PubNub):
123+
pubnub = pubnub
124+
125+
def emit(self, effect: PNEmittableEffect):
126+
if isinstance(effect, EmitMessagesEffect):
127+
self.emit_message(effect)
128+
if isinstance(effect, EmitStatusEffect):
129+
self.emit_status(effect)
130+
131+
def emit_message(self, effect: EmitMessagesEffect):
132+
pass
133+
134+
def emit_status(self, effect: EmitStatusEffect):
135+
pass

pubnub/event_engine/events.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pubnub.exceptions import PubNubException
2+
from typing import List
23

34

45
class PNEvent:
@@ -18,18 +19,18 @@ def __init__(self, timetoken: str, region: int) -> None:
1819

1920

2021
class PNChannelGroupsEvent(PNEvent):
21-
def __init__(self, channels: list[str], groups: list[str]) -> None:
22+
def __init__(self, channels: List[str], groups: List[str]) -> None:
2223
self.channels = channels
2324
self.groups = groups
2425

2526

2627
class SubscriptionChangedEvent(PNChannelGroupsEvent):
27-
def __init__(self, channels: list[str], groups: list[str]) -> None:
28+
def __init__(self, channels: List[str], groups: List[str]) -> None:
2829
PNChannelGroupsEvent.__init__(self, channels, groups)
2930

3031

3132
class SubscriptionRestoredEvent(PNCursorEvent, PNChannelGroupsEvent):
32-
def __init__(self, timetoken: str, region: int, channels: list[str], groups: list[str]) -> None:
33+
def __init__(self, timetoken: str, region: int, channels: List[str], groups: List[str]) -> None:
3334
PNCursorEvent.__init__(self, timetoken, region)
3435
PNChannelGroupsEvent.__init__(self, channels, groups)
3536

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1-
import events
2-
import states
1+
from pubnub.event_engine import effects, events, states
2+
from pubnub.event_engine.dispatcher import Dispatcher
3+
from typing import List
34

45

56
class StateMachine:
67
_current_state: states.PNState
7-
_context = states.PNContext()
8+
_context: states.PNContext
9+
_effect_list: List[effects.PNEffect]
810

911
def __init__(self, initial_state: states.PNState) -> None:
12+
self._context = states.PNContext()
1013
self._current_state = initial_state(self._context)
11-
self._effect_queue = []
14+
self._listeners = {}
15+
self._effect_list = []
16+
self._dispatcher = Dispatcher()
1217

1318
def get_state_name(self):
1419
return self._current_state.__class__.__name__
@@ -17,28 +22,30 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition:
1722
if event.get_name() in self._current_state._transitions:
1823
effect = self._current_state.on_exit()
1924
if effect:
20-
self._effect_queue.append(effect)
25+
self._effect_list.append(effect)
2126

2227
transition: states.PNTransition = self._current_state.on(event, self._context)
2328

2429
self._current_state = transition.state(self._current_state.get_context())
2530
self._context = transition.context
2631
if transition.effect:
27-
self._effect_queue.append(transition.effect)
32+
self._effect_list.append(transition.effect)
2833

2934
effect = self._current_state.on_enter(self._context)
3035
if effect:
31-
self._effect_queue.append(effect)
36+
self._effect_list.append(effect)
3237

3338
if transition.state:
3439
self._current_state = transition.state(self._context)
3540

3641
else:
3742
# we're ignoring events unhandled
3843
print('unhandled event??')
39-
pass
4044

41-
return self._effect_queue
45+
for effect in self._effect_list:
46+
self._dispatcher.dispatch_effect(effect)
47+
48+
return self._effect_list
4249

4350

4451
if __name__ == "__main__":
@@ -48,6 +55,9 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition:
4855
channels=['fail'], groups=[]
4956
))
5057

58+
machine.add_listener(effects.PNEffect, lambda x: print(f'Catch All Logger: {effect.__dict__}'))
59+
60+
machine.add_listener(effects.EmitMessagesEffect, )
5161
effect = machine.trigger(events.DisconnectEvent())
5262
print(f'SubscriptionChangedEvent triggered with channels=[`fail`]. Current state: {machine.get_state_name()}')
53-
print(f'effect queue: {machine._effect_queue}')
63+
print(f'effect queue: {machine._effect_list}')

pubnub/event_engine/states.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
import events
2-
import effects
3-
4-
from effects import PNEffect
5-
from typing import Union
6-
71
from pubnub.enums import PNStatusCategory
2+
from pubnub.event_engine import effects, events
3+
from pubnub.event_engine.effects import PNEffect
84
from pubnub.exceptions import PubNubException
5+
from typing import List, Union
96

107

118
class PNContext(dict):
@@ -25,7 +22,7 @@ class PNState:
2522

2623
def __init__(self, context: PNContext) -> None:
2724
self._context = context
28-
self._transitions = {}
25+
self._transitions = dict
2926

3027
def on(self, event: events.PNEvent, context: PNContext):
3128
return self._transitions[event.get_name()](event, context)
@@ -43,12 +40,12 @@ def get_context(self) -> PNContext:
4340
class PNTransition:
4441
context: PNContext
4542
state: PNState
46-
effect: Union[None, list[PNEffect]]
43+
effect: Union[None, List[PNEffect]]
4744

4845
def __init__(self,
4946
state: PNState,
5047
context: Union[None, PNContext] = None,
51-
effect: Union[None, list[PNEffect]] = None,
48+
effect: Union[None, List[PNEffect]] = None,
5249
) -> None:
5350
self.context = context
5451
self.state = state
@@ -182,7 +179,7 @@ def on_enter(self, context: Union[None, PNContext]):
182179

183180
def on_exit(self):
184181
super().on_exit()
185-
return effects.CancelHandshakeEffect()
182+
return effects.CancelHandshakeReconnectEffect()
186183

187184
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
188185
self._context.update(context)
@@ -411,7 +408,7 @@ def on_enter(self, context: Union[None, PNContext]):
411408

412409
def on_exit(self):
413410
super().on_exit()
414-
return effects.CancelReconnectEffect()
411+
return effects.CancelReceiveReconnectEffect()
415412

416413
def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context: PNContext) -> PNTransition:
417414
self._context.update(context)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from unittest.mock import patch
2+
from pubnub.event_engine import effects
3+
from pubnub.event_engine.dispatcher import Dispatcher
4+
5+
6+
def test_dispatch_emit_messages_effect():
7+
with patch.object(effects.EmitEffect, 'emit_message') as mocked_emit_message:
8+
dispatcher = Dispatcher()
9+
dispatcher.dispatch_effect(effects.EmitMessagesEffect(['chan']))
10+
mocked_emit_message.assert_called()
11+
12+
13+
def test_dispatch_emit_status_effect():
14+
with patch.object(effects.EmitEffect, 'emit_status') as mocked_emit_status:
15+
dispatcher = Dispatcher()
16+
dispatcher.dispatch_effect(effects.EmitStatusEffect(['chan']))
17+
mocked_emit_status.assert_called()

0 commit comments

Comments
 (0)