Skip to content

Commit 2657f86

Browse files
author
Sebastian Molenda
authored
Development/event engine (#164)
* Emit messages * Improved tests * Add cancelation effect support * Fix heartbeat * Update workflow - bump python versions * temporarily skip tests for 3.7 * Tests with busypie
1 parent f6b7e1d commit 2657f86

File tree

8 files changed

+143
-62
lines changed

8 files changed

+143
-62
lines changed

.github/workflows/run-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
strategy:
2727
fail-fast: true
2828
matrix:
29-
python: [3.7.13, 3.8.13, 3.9.13, 3.10.11, 3.11.3]
29+
python: [3.7.17, 3.8.17, 3.9.17, 3.10.12, 3.11.4]
3030
steps:
3131
- name: Checkout repository
3232
uses: actions/checkout@v3

pubnub/event_engine/dispatcher.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ def set_pn(self, pubnub_instance):
1616
self._effect_emitter.set_pn(pubnub_instance)
1717

1818
def dispatch_effect(self, effect: effects.PNEffect):
19-
print(f'dispatching {effect.__class__.__name__} {id(effect)}')
2019
if not self._managed_effects_factory:
2120
self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine)
2221

@@ -30,7 +29,6 @@ def dispatch_effect(self, effect: effects.PNEffect):
3029
self.dispatch_cancel_effect(effect)
3130

3231
def emit_effect(self, effect: effects.PNEffect):
33-
print(f' emiting {effect.__class__.__name__} with {effect.__dict__}')
3432
self._effect_emitter.emit(effect)
3533

3634
def dispatch_managed_effect(self, effect: effects.PNEffect):

pubnub/event_engine/manage_effects.py

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
import asyncio
2+
import logging
23

34
from queue import SimpleQueue
45
from typing import Union
56
from pubnub.endpoints.pubsub.subscribe import Subscribe
7+
from pubnub.models.consumer.pubsub import PNMessageResult
8+
from pubnub.models.server.subscribe import SubscribeMessage
69
from pubnub.pubnub import PubNub
710
from pubnub.event_engine.models import effects, events
811
from pubnub.models.consumer.common import PNStatus
9-
from pubnub.workers import SubscribeMessageWorker
1012

1113

1214
class ManagedEffect:
1315
pubnub: PubNub = None
1416
event_engine = None
1517
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]
18+
stop_event = None
1619

1720
def set_pn(self, pubnub: PubNub):
1821
self.pubnub = pubnub
@@ -30,28 +33,36 @@ def run_async(self):
3033
pass
3134

3235
def stop(self):
33-
pass
36+
logging.debug(f'stop called on {self.__class__.__name__}')
37+
if self.stop_event:
38+
logging.debug(f'stop_event({id(self.stop_event)}).set() called on {self.__class__.__name__}')
39+
self.stop_event.set()
40+
41+
def get_new_stop_event(self):
42+
event = asyncio.Event()
43+
logging.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}')
44+
return event
3445

3546

3647
class ManageHandshakeEffect(ManagedEffect):
3748
def run(self):
3849
channels = self.effect.channels
3950
groups = self.effect.groups
4051
if hasattr(self.pubnub, 'event_loop'):
52+
self.stop_event = self.get_new_stop_event()
53+
4154
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
4255
if loop.is_running():
43-
loop.create_task(self.handshake_async(channels, groups))
56+
loop.create_task(self.handshake_async(channels, groups, self.stop_event))
4457
else:
45-
loop.run_until_complete(self.handshake_async(channels, groups))
58+
loop.run_until_complete(self.handshake_async(channels, groups, self.stop_event))
4659
else:
4760
# TODO: the synchronous way
4861
pass
4962

50-
def stop(self):
51-
pass
52-
53-
async def handshake_async(self, channels, groups):
54-
handshake = await Subscribe(self.pubnub).channels(channels).channel_groups(groups).future()
63+
async def handshake_async(self, channels, groups, stop_event):
64+
request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
65+
handshake = await request.future()
5566
if not handshake.status.error:
5667
cursor = handshake.result['t']
5768
timetoken = cursor['t']
@@ -70,31 +81,39 @@ def run(self):
7081
region = self.effect.region
7182

7283
if hasattr(self.pubnub, 'event_loop'):
84+
self.stop_event = self.get_new_stop_event()
7385
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
74-
coro = self.receive_messages_async(channels, groups, timetoken, region)
7586
if loop.is_running():
76-
loop.create_task(coro)
87+
loop.create_task(self.receive_messages_async(channels, groups, timetoken, region))
7788
else:
78-
loop.run_until_complete(coro)
89+
loop.run_until_complete(self.receive_messages_async(channels, groups, timetoken, region))
7990
else:
8091
# TODO: the synchronous way
8192
pass
8293

83-
def stop(self):
84-
pass
85-
8694
async def receive_messages_async(self, channels, groups, timetoken, region):
87-
response = await Subscribe(self.pubnub).channels(channels).channel_groups(groups).timetoken(timetoken) \
88-
.region(region).future()
89-
90-
if not response.status.error:
91-
cursor = response.result['t']
92-
timetoken = cursor['t']
93-
region = cursor['r']
94-
messages = response.result['m']
95-
print(response.result)
96-
recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages)
97-
self.event_engine.trigger(recieve_success)
95+
subscribe = Subscribe(self.pubnub)
96+
if channels:
97+
subscribe.channels(channels)
98+
if groups:
99+
subscribe.channel_groups(groups)
100+
if timetoken:
101+
subscribe.timetoken(timetoken)
102+
if region:
103+
subscribe.region(region)
104+
105+
subscribe.cancellation_event(self.stop_event)
106+
response = await subscribe.future()
107+
108+
if response and response.result:
109+
if not response.status.error:
110+
cursor = response.result['t']
111+
timetoken = cursor['t']
112+
region = cursor['r']
113+
messages = response.result['m']
114+
recieve_success = events.ReceiveSuccessEvent(timetoken, region=region, messages=messages)
115+
self.event_engine.trigger(recieve_success)
116+
self.stop_event.set()
98117

99118

100119
class ManagedEffectFactory:
@@ -120,7 +139,6 @@ class EmitEffect:
120139
def set_pn(self, pubnub: PubNub):
121140
self.pubnub = pubnub
122141
self.queue = SimpleQueue
123-
self.message_worker = SubscribeMessageWorker(self.pubnub, None, None, None)
124142

125143
def emit(self, effect: effects.PNEmittableEffect):
126144
if isinstance(effect, effects.EmitMessagesEffect):
@@ -129,7 +147,17 @@ def emit(self, effect: effects.PNEmittableEffect):
129147
self.emit_status(effect)
130148

131149
def emit_message(self, effect: effects.EmitMessagesEffect):
132-
self.pubnub._subscription_manager._listener_manager.announce_message('foo')
150+
for message in effect.messages:
151+
subscribe_message = SubscribeMessage().from_json(message)
152+
pn_message_result = PNMessageResult(
153+
message=subscribe_message.payload,
154+
subscription=subscribe_message.subscription_match,
155+
channel=subscribe_message.channel,
156+
timetoken=int(message['p']['t']),
157+
user_metadata=subscribe_message.publish_metadata,
158+
publisher=subscribe_message.issuing_client_id
159+
)
160+
self.pubnub._subscription_manager._listener_manager.announce_message(pn_message_result)
133161

134162
def emit_status(self, effect: effects.EmitStatusEffect):
135163
pn_status = PNStatus()

pubnub/event_engine/models/states.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,6 @@ def __init__(self, context: PNContext) -> None:
329329

330330
def on_enter(self, context: Union[None, PNContext]):
331331
super().on_enter(context)
332-
print(self._context)
333332
return effects.ReceiveMessagesEffect(context.channels, context.groups, timetoken=self._context.timetoken,
334333
region=self._context.region)
335334

pubnub/event_engine/statemachine.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ def get_dispatcher(self) -> Dispatcher:
3434
return self._dispatcher
3535

3636
def trigger(self, event: events.PNEvent) -> states.PNTransition:
37-
logging.info(f'Triggered {event.__class__.__name__} on {self._current_state.__class__.__name__}')
37+
logging.debug(f'Triggered {event.__class__.__name__} on {self._current_state.__class__.__name__}')
3838
if not self._enabled:
3939
return False
4040
if event.get_name() in self._current_state._transitions:
4141
self._effect_list.clear()
4242
effect = self._current_state.on_exit()
43-
logging.info(f'On exit effect: {effect.__class__.__name__}')
43+
logging.debug(f'On exit effect: {effect.__class__.__name__}')
4444

4545
if effect:
4646
self._effect_list.append(effect)
@@ -52,29 +52,29 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition:
5252
self._context = transition.context
5353
if transition.effect:
5454
if isinstance(transition.effect, list):
55-
logging.info('unpacking list')
55+
logging.debug('unpacking list')
5656
for effect in transition.effect:
57-
logging.info(f'Transition effect: {effect.__class__.__name__}')
57+
logging.debug(f'Transition effect: {effect.__class__.__name__}')
5858
self._effect_list.append(effect)
5959
else:
60-
logging.info(f'Transition effect: {transition.effect.__class__.__name__}')
60+
logging.debug(f'Transition effect: {transition.effect.__class__.__name__}')
6161
self._effect_list.append(transition.effect)
6262

6363
effect = self._current_state.on_enter(self._context)
6464
if effect:
65-
logging.info(f'On enter effect: {effect.__class__.__name__}')
65+
logging.debug(f'On enter effect: {effect.__class__.__name__}')
6666
self._effect_list.append(effect)
6767

6868
else:
6969
self.stop()
7070
# we're ignoring events unhandled
71-
logging.info(f'unhandled event?? {event.__class__.__name__} in {self._current_state.__class__.__name__}')
71+
logging.debug(f'unhandled event?? {event.__class__.__name__} in {self._current_state.__class__.__name__}')
7272

7373
self.dispatch_effects()
7474

7575
def dispatch_effects(self):
7676
for effect in self._effect_list:
77-
logging.info(f'dispatching {effect.__class__.__name__} {id(effect)}')
77+
logging.debug(f'dispatching {effect.__class__.__name__} {id(effect)}')
7878
self._dispatcher.dispatch_effect(effect)
7979

8080
self._effect_list.clear()
@@ -86,14 +86,14 @@ def stop(self):
8686
""" TODO: Remove before prodction """
8787
if __name__ == "__main__":
8888
machine = StateMachine(states.UnsubscribedState)
89-
logging.info(f'machine initialized. Current state: {machine.get_state_name()}')
89+
logging.debug(f'machine initialized. Current state: {machine.get_state_name()}')
9090
effect = machine.trigger(events.SubscriptionChangedEvent(
9191
channels=['fail'], groups=[]
9292
))
9393

94-
machine.add_listener(effects.PNEffect, lambda x: logging.info(f'Catch All Logger: {effect.__dict__}'))
94+
machine.add_listener(effects.PNEffect, lambda x: logging.debug(f'Catch All Logger: {effect.__dict__}'))
9595

9696
machine.add_listener(effects.EmitMessagesEffect, )
9797
effect = machine.trigger(events.DisconnectEvent())
98-
logging.info(f'SubscriptionChangedEvent triggered with channels=[`fail`]. Curr state: {machine.get_state_name()}')
99-
logging.info(f'effect queue: {machine._effect_list}')
98+
logging.debug(f'SubscriptionChangedEvent triggered with channels=[`fail`]. Curr state: {machine.get_state_name()}')
99+
logging.debug(f'effect queue: {machine._effect_list}')

pubnub/pubnub_asyncio.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,8 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None):
5858

5959
def __del__(self):
6060
pass
61-
# if self.event_loop.is_running():
62-
# tasks = asyncio.tasks.all_tasks(self.event_loop)
63-
# if len(tasks):
64-
# self.event_loop.run_until_complete(self.close_pending_tasks(tasks))
65-
# self.event_loop.run_until_complete(self._session.close())
61+
if self.event_loop.is_running():
62+
self.event_loop.create_task(self.close_session())
6663

6764
async def close_pending_tasks(self, tasks):
6865
await asyncio.gather(*tasks)

requirements-dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ cbor2
1010
behave
1111
vcrpy
1212
urllib3<2
13+
busypie
Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import busypie
23
import logging
34
import pytest
45
import sys
@@ -8,34 +9,91 @@
89

910
from pubnub.pubnub_asyncio import PubNubAsyncio, EventEngineSubscriptionManager, SubscribeCallback
1011
from pubnub.event_engine.models import states
12+
from pubnub.models.consumer.common import PNStatus
13+
from pubnub.enums import PNStatusCategory
1114

1215
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
1316

1417

18+
class TestCallback(SubscribeCallback):
19+
_status_called = False
20+
_message_called = False
21+
22+
def status_called(self):
23+
return self._status_called
24+
25+
def message_called(self):
26+
return self._message_called
27+
28+
def status(self, pubnub, status: PNStatus):
29+
self._status_called = True
30+
assert status.error is False
31+
assert status.category is PNStatusCategory.PNConnectedCategory
32+
logging.warning('calling status_callback()')
33+
self.status_callback()
34+
35+
def message(self, pubnub, message):
36+
self._message_called = True
37+
assert message.channel == 'foo'
38+
assert message.message == 'test'
39+
logging.warning('calling message_callback()')
40+
self.message_callback()
41+
42+
def status_callback(self):
43+
pass
44+
45+
def message_callback(self):
46+
pass
47+
48+
1549
@pytest.mark.asyncio
16-
async def test_subscribe_triggers_event():
50+
async def test_subscribe():
51+
loop = asyncio.get_event_loop()
1752
config = pnconf_env_copy()
1853
config.enable_subscribe = True
19-
20-
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager)
21-
22-
with patch.object(SubscribeCallback, 'status') as mocked_status, \
23-
patch.object(SubscribeCallback, 'message') as mocked_message:
24-
callback = SubscribeCallback()
54+
callback = TestCallback()
55+
with patch.object(TestCallback, 'status_callback') as status_callback, \
56+
patch.object(TestCallback, 'message_callback') as message_callback:
57+
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager, custom_event_loop=loop)
2558
pubnub.add_listener(callback)
2659
pubnub.subscribe().channels('foo').execute()
27-
await delayed_publish('foo', 'test', 2)
28-
await asyncio.sleep(5)
60+
await delayed_publish('foo', 'test', 1)
61+
await busypie.wait().at_most(10).poll_delay(2).poll_interval(1).until_async(lambda: callback.message_called)
2962
assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__
30-
mocked_status.assert_called()
31-
mocked_message.assert_called()
63+
64+
status_callback.assert_called()
65+
message_callback.assert_called()
3266
pubnub.unsubscribe_all()
33-
await asyncio.sleep(2)
3467
pubnub._subscription_manager.stop()
35-
await asyncio.sleep(0.1)
68+
69+
try:
70+
await asyncio.gather(*asyncio.tasks.all_tasks())
71+
except asyncio.CancelledError:
72+
pass
73+
await pubnub.close_session()
3674

3775

3876
async def delayed_publish(channel, message, delay):
3977
pn = PubNubAsyncio(pnconf_env_copy())
4078
await asyncio.sleep(delay)
4179
await pn.publish().channel(channel).message(message).future()
80+
81+
82+
@pytest.mark.asyncio
83+
async def test_handshaking():
84+
config = pnconf_env_copy()
85+
config.enable_subscribe = True
86+
callback = TestCallback()
87+
with patch.object(TestCallback, 'status_callback') as status_callback:
88+
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager)
89+
pubnub.add_listener(callback)
90+
pubnub.subscribe().channels('foo').execute()
91+
await busypie.wait().at_most(10).poll_delay(2).poll_interval(1).until_async(lambda: callback.status_called)
92+
assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__
93+
status_callback.assert_called()
94+
pubnub._subscription_manager.stop()
95+
try:
96+
await asyncio.gather(*asyncio.tasks.all_tasks())
97+
except asyncio.CancelledError:
98+
pass
99+
await pubnub.close_session()

0 commit comments

Comments
 (0)