forked from biesnecker/cleveland
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
55 lines (42 loc) · 1.33 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from cleveland.actor import BaseActor
from cleveland.broadcaster import TimedBroadcaster
from cleveland.message import Message
import asyncio
class StringMessage(Message): pass
class PrintActor(BaseActor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.register_handler(StringMessage,
self._string_message_handler)
@asyncio.coroutine
def _string_message_handler(self, message):
print(message.payload)
@asyncio.coroutine
def say_hello():
a = BaseActor()
b = PrintActor()
a.start()
b.start()
for _ in range(10):
message = StringMessage('Hello world!')
yield from asyncio.sleep(0.25)
yield from a.tell(b, message)
yield from a.stop()
yield from b.stop()
asyncio.get_event_loop().run_until_complete(say_hello())
########################################
class GreetingBroadcaster(TimedBroadcaster):
@asyncio.coroutine
def _message(self):
return StringMessage('Hello dolly!')
@asyncio.coroutine
def broadcast_hello():
b = PrintActor()
b.start()
a = GreetingBroadcaster(targets=[b], interval=0.75)
a.start()
for _ in range(5):
yield from asyncio.sleep(1)
yield from a.stop()
yield from b.stop()
asyncio.get_event_loop().run_until_complete(broadcast_hello())