Skip to content

Commit

Permalink
Add mailbox module
Browse files Browse the repository at this point in the history
  • Loading branch information
i2y committed Jul 20, 2015
1 parent 2d8fe27 commit a634e18
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 191 deletions.
20 changes: 1 addition & 19 deletions examples/actor_example.mochi
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,11 @@ def show_once():
message:
print(message)

record Message(tag, value)
print(Message)
print(__name__)
def show_loop():
receive:
PList(&rest):
print('aaa')
show_loop()
Message(tag, value):
print('Message!', tag, value)
show_loop()
[tag, value]:
print(tag, value)
show_loop()
Expand Down Expand Up @@ -46,18 +40,6 @@ sleep(1)
# -> foo 1000
# -> bar 2000

remote_actor = RemoteActor('tcp://localhost:9999/test')
remote_actor ! Message('mefafsg', ['value'])
remote_actor ! ['remote!', [[3000]]]
remote_actor ! ['remo', Message(3000, 1000)]

remote_actor ! {'x': [1, 2, 3]}
remote_actor ! s(1, [2, 1, 2, 3])
remote_actor ! b(1, [2, 1, 2, 3])
remote_actor ! l(1, [2, 1, 2, 3])
wait_all()

hub = ActorHub('tcp://*:9999')
hub.register('test', actor2)
hub.run()

wait_all()
30 changes: 30 additions & 0 deletions examples/consumer.mochi
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from mochi.actor.mailbox import KombuMailbox, ZmqInbox

record Message(tag, value)

def consumer():
receive:
Message(tag, value):
print(tag, ':', value)
# ack()
consumer()
"special":
print("!!!!!!!")
# ack()
consumer()
"exit":
# ack()
print("exit!")
other:
print(other)
# ack()
consumer()

mailbox = KombuMailbox('sqs://<access_key_id>@<secret_access_key>:80//',
'<queue_name>',
dict(region='<region>'))
# no_ack=False)
spawn_with_mailbox(consumer, mailbox)
mailbox = ZmqInbox('tcp://*:9999')
spawn_with_mailbox(consumer, mailbox)
wait_all()
19 changes: 19 additions & 0 deletions examples/producer.mochi
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from mochi.actor.mailbox import KombuMailbox, ZmqOutbox

record Message(tag, value)

mailbox = KombuMailbox('sqs://<access_key_id>@<secret_access_key>:80//',
'<queue_name>',
dict(region='<region>'))

mailbox ! Message('sqs', 'AWS SQS')
mailbox ! [2, 3]
mailbox ! 'special'
mailbox ! 'exit'

mailbox = ZmqOutbox('tcp://localhost:9999')

mailbox ! Message('zmq', 'ZeroMQ')
mailbox ! [2, 3]
mailbox ! 'special'
mailbox ! 'exit'
215 changes: 48 additions & 167 deletions mochi/actor/actor.py
Original file line number Diff line number Diff line change
@@ -1,169 +1,24 @@
import sys
from collections import Mapping, Set
from eventlet.queue import Queue
from abc import ABCMeta

import eventlet
from eventlet.green import zmq
from urllib.parse import urlparse
from kazoo.client import KazooClient, KazooRetry
from msgpack import packb, unpackb, ExtType
from pyrsistent import (PVector, PList, PBag,
pvector, pmap, pset, plist, pbag)
from .mailbox import Mailbox, AckableMailbox, LocalMailbox


_actor_map = {}

_actor_pool = eventlet.GreenPool()

_native_builtin_types = (int, float, str, bool)

TYPE_PSET = 1
TYPE_PLIST = 2
TYPE_PBAG = 3


def decode(obj):
if isinstance(obj, ExtType):
if obj.code == TYPE_PSET:
unpacked_data = unpackb(obj.data,
use_list=False,
encoding='utf-8')
return pset(decode(item) for item in unpacked_data)
if obj.code == TYPE_PLIST:
unpacked_data = unpackb(obj.data,
use_list=False,
encoding='utf-8')
return plist(decode(item) for item in unpacked_data)
if obj.code == TYPE_PBAG:
unpacked_data = unpackb(obj.data,
use_list=False,
encoding='utf-8')
return pbag(decode(item) for item in unpacked_data)
module_name, class_name, *data = unpackb(obj.data,
use_list=False,
encoding='utf-8')
cls = getattr(sys.modules[module_name],
class_name)
return cls(*(decode(item) for item in data))
if isinstance(obj, tuple):
return pvector(decode(item) for item in obj)
if isinstance(obj, dict):
new_dict = dict()
for key in obj.keys():
new_dict[decode(key)] = decode(obj[key])
return pmap(new_dict)
return obj


def encode(obj):
if type(obj) in (list, tuple) or isinstance(obj, PVector):
return [encode(item) for item in obj]
if isinstance(obj, Mapping):
encoded_obj = {}
for key in obj.keys():
encoded_obj[encode(key)] = encode(obj[key])
return encoded_obj
if isinstance(obj, _native_builtin_types):
return obj
if isinstance(obj, Set):
return ExtType(TYPE_PSET, packb([encode(item) for item in obj], use_bin_type=True))
if isinstance(obj, PList):
return ExtType(TYPE_PLIST, packb([encode(item) for item in obj], use_bin_type=True))
if isinstance(obj, PBag):
return ExtType(TYPE_PBAG, packb([encode(item) for item in obj], use_bin_type=True))
# assume record
cls = obj.__class__
return ExtType(0, packb([cls.__module__, cls.__name__] + [encode(item) for item in obj],
use_bin_type=True))


class ActorAddressBook(object):
def __init__(self, zk_hosts, timeout=60.0):
self.retry = KazooRetry(max_tries=10)
self.zk = KazooClient(hosts=zk_hosts, timeout=timeout)
self.zk.start()

def lookup(self, path):
return self.retry(self._lookup, path)

def _lookup(self, path):
actor_url, stat = self.zk.get(path)
return RemoteActor(actor_url.decode('utf-8'))

def register(self, path, actor_url):
return self.retry(self._register, path, actor_url)

def _register(self, path, actor_url):
self.zk.ensure_path(path)
self.zk.set(path, actor_url.encode('utf-8'))

def delete(self, path):
self.zk.delete(path, recursive=True)

def __del__(self):
self.zk.stop()


class ActorHub(object):
def __init__(self, url='tcp://*:9999'):
self._path_actor_mapping = {}
self._url = url
self._context = zmq.Context()
self._recv_sock = self._context.socket(zmq.PAIR)
self._recv_sock.bind(url)

def register(self, path, actor):
if isinstance(actor, ActorBase):
self._path_actor_mapping[path] = actor
else:
raise TypeError('can only register an actor')

def unregister(self, path):
del self._path_actor_mapping[path]

def _run(self):
while True:
pyobj = decode(unpackb(self._recv_sock.recv(),
encoding='utf-8',
use_list=False))
path, msg = pyobj
if path in self._path_actor_mapping:
self._path_actor_mapping[path].send(msg)
else:
# ignore
pass

def run(self):
_actor_pool.spawn(self._run)

def __del__(self):
self._recv_sock.close()


class ActorBase(object):
pass


class RemoteActor(ActorBase):

def __init__(self, url):
parsed_url = urlparse(url)
self._base_url = parsed_url.scheme + "://" + parsed_url.netloc
self._path = parsed_url.path[1:]
self._context = zmq.Context()
self._send_sock = self._context.socket(zmq.PAIR)
self._send_sock.connect(self._base_url)

def send(self, msg):
self._send_sock.send(packb(encode((self._path, msg)),
encoding='utf-8',
use_bin_type=True))

def __del__(self):
self._send_sock.close()
class ActorBase(metaclass=ABCMeta):
pass


class Actor(ActorBase):
def __init__(self, callback):
self._inbox = Queue()
def __init__(self, callback, mailbox=LocalMailbox()):
assert isinstance(mailbox, Mailbox)
self._ack = isinstance(mailbox, AckableMailbox)
self._inbox = mailbox
self._outbox = mailbox
self._callback = callback
self._greenlet = None

Expand Down Expand Up @@ -204,27 +59,47 @@ def wait(self):
return self._greenlet.wait()

def send(self, message):
self._inbox.put(message)
if self._outbox is not None:
self._outbox.put(message)

def receive(self):
return self._inbox.get()
if self._inbox is not None:
return self._inbox.get()

def ack_last_msg(self):
if self._ack:
self._inbox.ack()

default_inbox = Queue()
default_mailbox = LocalMailbox()


def send(message, actor=None):
if isinstance(actor, ActorBase):
actor.send(message)
def send(message, target=None):
if isinstance(target, ActorBase):
target.send(message)
elif isinstance(target, Mailbox):
target.put(message)
else:
default_inbox.put(message)
default_mailbox.put(message)


def recv(actor=None):
if isinstance(actor, ActorBase):
return actor.receive()
def recv(target=None):
if isinstance(target, ActorBase):
return target.receive()
elif isinstance(target, Mailbox):
return target.get()
else:
return default_inbox.get()
return default_mailbox.get()


def ack_last_msg(target=None):
if isinstance(target, Actor):
target.ack_last_msg()
elif isinstance(target, AckableMailbox):
target.ack()


def ack():
self().ack_last_msg()


def link(actor, func, *args, **kwargs):
Expand Down Expand Up @@ -254,6 +129,12 @@ def spawn(func, *args, **kwargs):
return actor


def spawn_with_mailbox(func, mailbox, *args, **kwargs):
actor = Actor(func, mailbox)
actor.spawn(*args, **kwargs)
return actor


def sleep(seconds):
eventlet.sleep(seconds)

Expand Down
Loading

0 comments on commit a634e18

Please sign in to comment.