diff --git a/cluster/README.txt b/cluster/README.txt index 6190f20c5..98446e489 100644 --- a/cluster/README.txt +++ b/cluster/README.txt @@ -5,6 +5,8 @@ Requirements: Python This directory holds a simple implementation of a replicated state machine that can also feed clients information on cluster membership. -* aa_statemachine.py -- illustrates the state machine client API -* bb_network.py -- introduces a network of communicating processors -* cc_replicated.py -- a replicated state machine with consensus and a fixed membership +* statemachine.py -- illustrates the state machine client API +* network.py -- handles network communication +* client.py -- a simple client +* member_single.py -- a non-clustered member server, talking to the client +* member_replicated.py -- a replicated state machine with consensus and a fixed membership diff --git a/cluster/bb_network.py b/cluster/bb_network.py deleted file mode 100644 index 7a1551afa..000000000 --- a/cluster/bb_network.py +++ /dev/null @@ -1,78 +0,0 @@ -import logging -from multiprocessing import Process, Queue - -# Remove from final copy: -# - logging - -network = {} - -class Node(object): - - def __init__(self, address): - self.q = Queue() - self.address = address - self.logger = logging.getLogger('node.%s' % address) - network[self.address] = self.q - - def send(self, destinations, action, **kwargs): - self.logger.debug("sending %s with args %s to %s" % (action, kwargs, destinations)) - for dest in destinations: - network[dest].put((action, kwargs)) - - -class Member(Node, Process): - - def __init__(self, execute_fn, address): - Node.__init__(self, address) - Process.__init__(self, name=address) - self.execute_fn = execute_fn - - def run(self): - while True: - action, kwargs = self.q.get() - if not action: - return - self.logger.debug("received %r with args %r" % (action, kwargs)) - getattr(self, 'do_%s' % action)(**kwargs) - - def join(self): - self.q.put((None, None)) - Process.join(self) - - def start(self, initial_value=None): - self.state = initial_value - Process.start(self) - - def invoke(self, input): - self.state, output = self.execute_fn(self.state, input) - return output - - def do_INVOKE(self, input, caller): - self.send([caller], 'INVOKED', output=self.invoke(input)) - - -class Client(Node): - - def __init__(self, address, member_address): - Node.__init__(self, address) - self.member_address = member_address - - def invoke(self, input): - self.send([self.member_address], 'INVOKE', input=input, caller=self.address) - action, kwargs = self.q.get() - self.logger.debug("received %r with args %r" % (action, kwargs)) - return kwargs['output'] - - -def sequence_generator(state, input): - return state+input, range(state, state+input) - - -if __name__ == "__main__": - logging.basicConfig(format="%(asctime)s %(name)s proc=%(processName)s %(message)s", level=logging.DEBUG) - member = Member(sequence_generator, address='memb1') - client = Client('client', member.address) - member.start(initial_value=0) - print client.invoke(4) - print client.invoke(1) - member.join() diff --git a/cluster/client.py b/cluster/client.py new file mode 100644 index 000000000..9afd0b58b --- /dev/null +++ b/cluster/client.py @@ -0,0 +1,52 @@ +import sys +import logging +from network import Node + + +class Client(Node): + + def __init__(self, member_address): + super(Client, self).__init__() + self.member_address = member_address + + def invoke(self, input): + self.output = None + self.send([self.member_address], 'INVOKE', + input=input, caller=self.address) + self.run() + return self.output + + def do_INVOKED(self, output): + self.logger.debug("received output %r" % (output,)) + self.output = output + self.stop() + +if __name__ == "__main__": + logging.basicConfig( + format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG) + client = Client(sys.argv[1]) + print client.invoke(4) + print client.invoke(1) + +# tests + +import unittest +import threading + + +class FakeMember(Node): + + def do_INVOKE(self, caller, input): + self.send([caller], 'INVOKED', output=input * 10) + self.stop() + + +class ClientTests(unittest.TestCase): + + def test_invoke(self): + member = FakeMember() + client = Client(member.address) + memberthd = threading.Thread(target=member.run) + memberthd.start() + self.assertEqual(client.invoke(5), 50) + memberthd.join() diff --git a/cluster/cc_replicated.py b/cluster/member_replicated.py similarity index 57% rename from cluster/cc_replicated.py rename to cluster/member_replicated.py index f0e7cc046..8cefdb8f2 100644 --- a/cluster/cc_replicated.py +++ b/cluster/member_replicated.py @@ -1,12 +1,16 @@ +from flufl.enum import Enum +import sys +import uuid import random import logging -from bb_network import Member, Client, sequence_generator +from member_single import Member, sequence_generator # Fix in final copy: # - include repeated classes # - merge ClusterMember and Member # - remove logging stuff + def log_acceptor_state(fn): def wrap(self, **kwargs): round = kwargs['round'] @@ -17,6 +21,7 @@ def wrap(self, **kwargs): return rv return wrap + def log_proposer_state(fn): def wrap(self, **kwargs): rv = fn(self, **kwargs) @@ -26,6 +31,7 @@ def wrap(self, **kwargs): return rv return wrap + def log_learner_state(fn): def wrap(self, **kwargs): round = kwargs['round'] @@ -46,24 +52,40 @@ def __getitem__(self, i): def __setitem__(self, i, v): if i >= len(self): - self.extend([None]*(i - len(self) + 1)) + self.extend([None] * (i - len(self) + 1)) list.__setitem__(self, i, v) +Modes = Enum('Modes', ('Initialized', 'Joining', 'Active')) + +def only_in_mode(*modes): + modes = set(modes) + def wrap(fn): + def replacement(self, *args, **kwargs): + if self.mode not in modes: + return + return fn(self, *args, **kwargs) + return replacement + return wrap + + class ClusterMember(Member): - cluster = [] + namespace = uuid.UUID('7e0d7720-fa98-4270-94ff-650a2c25f3f0') + def __init__(self, execute_fn): + Member.__init__(self, execute_fn) + self.unique_id = uuid.uuid3(self.namespace, self.address).int - def __init__(self, execute_fn, address): - Member.__init__(self, execute_fn, address) - self.cluster.append(address) + # member + self.mode = Modes.Initialized + self.peers = [self.address] # acceptor self.last_accepts = defaultlist() self.promises = defaultlist() # proposer - self.biggest_n = (0,0) + self.biggest_n = (0, 0) self.active_proposals = defaultlist() # learner @@ -72,26 +94,59 @@ def __init__(self, execute_fn, address): self.waiting_clients = defaultlist() self.values = defaultlist() + def start(self, initial_value=None, cluster_members=[]): + assert self.mode is Modes.Initialized + if initial_value is not None: + self.state = initial_value + self.mode = Modes.Active + else: + self.state = None + self.mode = Modes.Joining + self.send(cluster_members, 'ADD_MEMBER', new_member=self.address) + self.run() + @property def quorum(self): - return len(self.cluster)/2+1 + return len(self.peers) / 2 + 1 def next_n(self): - n = (self.biggest_n[0]+1, self.pid) + n = (self.biggest_n[0] + 1, self.unique_id) self.biggest_n = n - return n[0]*65536+n[1] - + return n[0] * 65536 + n[1] + + @only_in_mode(Modes.Active) + def do_ADD_MEMBER(self, new_member): + self.peers.append(new_member) + self.send([new_member], 'JOIN_CLUSTER', + state=self.state, + last_invoked=self.last_invoked, + peers=self.peers) + self.send(self.peers, 'UPDATE_PEERS', + peers=self.peers) + + @only_in_mode(Modes.Joining) + def do_JOIN_CLUSTER(self, state, last_invoked, peers): + self.mode = Modes.Active + self.state = state + self.last_invoked = last_invoked + self.peers = list(set(peers) | set(self.peers)) + + @only_in_mode(Modes.Active) + def do_UPDATE_PEERS(self, peers): + self.peers = list(set(peers) | set(self.peers)) + + @only_in_mode(Modes.Active) @log_proposer_state def do_INVOKE(self, input, caller): round = len(self.values) self.waiting_clients[round] = caller self.values.append(None) - n = self.next_n() - quorum = random.sample(self.cluster, self.quorum) + n = self.next_n() + quorum = random.sample(self.peers, self.quorum) self.logger.info("beginning round %d with value %r and quorum %r" % - (round, input, quorum)) + (round, input, quorum)) self.active_proposals[round] = dict( - n=n, value=input, quorum=quorum, promises={}, have_accepted=False) + n=n, value=input, quorum=quorum, promises={}, have_accepted=False) self.send(quorum, 'PREPARE', proposer=self.address, n=n, round=round) # if this request fails, make_request will re-invoke us @@ -103,8 +158,9 @@ def do_PREPARE(self, proposer, n, round): last_accept = self.last_accepts[round] prev_n = last_accept['n'] if last_accept else -1 prev_value = last_accept['value'] if last_accept else None - self.send([proposer], 'PROMISE', round=round, responder=self.address, - prev_n=prev_n, prev_value=prev_value) + self.send( + [proposer], 'PROMISE', round=round, responder=self.address, + prev_n=prev_n, prev_value=prev_value) @log_proposer_state def do_PROMISE(self, round, responder, prev_n, prev_value): @@ -114,7 +170,8 @@ def do_PROMISE(self, round, responder, prev_n, prev_value): promises = active_proposal['promises'] promises[responder] = dict(prev_n=prev_n, prev_value=prev_value) if len(promises) >= self.quorum: - self.logger.info("received %d responses in round %d: %r" % (len(promises), round, promises)) + self.logger.info("received %d responses in round %d: %r" % + (len(promises), round, promises)) # find the value of the largest-numbered proposal returned to us, defaulting # to our own value if none is given value = active_proposal['value'] @@ -122,8 +179,9 @@ def do_PROMISE(self, round, responder, prev_n, prev_value): for prom in promises.values(): if prom['prev_n'] > largest_n: value, largest_n = prom['prev_value'], prom['prev_n'] - quorum = random.sample(self.cluster, self.quorum) - self.send(quorum, 'PROPOSE', round=round, n=active_proposal['n'], value=value, proposer=self.address) + quorum = random.sample(self.peers, self.quorum) + self.send(quorum, 'PROPOSE', round=round, + n=active_proposal['n'], value=value, proposer=self.address) @log_acceptor_state def do_PROPOSE(self, round, n, value, proposer): @@ -133,7 +191,7 @@ def do_PROPOSE(self, round, n, value, proposer): last_accept = self.last_accepts[round] if not last_accept or last_accept['n'] < n: self.last_accepts[round] = {'n': n, 'value': value} - self.send(self.cluster, 'ACCEPT', round=round, n=n, value=value) + self.send(self.peers, 'ACCEPT', round=round, n=n, value=value) @log_learner_state def do_ACCEPT(self, round, n, value): @@ -143,16 +201,19 @@ def do_ACCEPT(self, round, n, value): count = accept_reqs_by_n[n] = accept_reqs_by_n.get(n, 0) + 1 if count < self.quorum: return - self.logger.info("received %d accepts in round %d for proposal n=%d/value=%r; learning" % (count, round, n, value)) + self.logger.info( + "received %d accepts in round %d for proposal n=%d/value=%r; learning" % + (count, round, n, value)) self.learn(round, value) def learn(self, round, value): if self.values[round] != value: - assert self.values[round] is None, "values[%d] is already %r" % (round, self.values[round]) + assert self.values[round] is None, "values[%d] is already %r" % ( + round, self.values[round]) self.values[round] = value # catch up on the log as far as we can go, sending INVOKED messages # where necessary - for i in xrange(self.last_invoked+1, len(self.values)): + for i in xrange(self.last_invoked + 1, len(self.values)): if self.values[i] is None: break output = self.invoke(self.values[i]) @@ -162,10 +223,13 @@ def learn(self, round, value): if __name__ == "__main__": - logging.basicConfig(format="%(asctime)s %(name)s proc=%(processName)s %(message)s", level=logging.DEBUG) - members = [ClusterMember(sequence_generator, address='memb%d' % i) for i in range(3)] - client = Client('client', members[0].address) - [member.start(initial_value=0) for member in members] - print client.invoke(4) - print client.invoke(1) - [member.join() for member in members] + logging.basicConfig( + format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG) + member = ClusterMember(sequence_generator) + print member.address + cluster_members = sys.argv[1:] + if cluster_members: + member.start(cluster_members=cluster_members) + else: + print "starting new cluster" + member.start(initial_value=0) diff --git a/cluster/member_single.py b/cluster/member_single.py new file mode 100644 index 000000000..b13f53e31 --- /dev/null +++ b/cluster/member_single.py @@ -0,0 +1,62 @@ +import sys +import logging +from network import Node + +# Remove from final copy: +# - logging + + +class Member(Node): + + def __init__(self, execute_fn): + Node.__init__(self) + self.execute_fn = execute_fn + + def start(self, initial_value=None): + self.state = initial_value + self.run() + + def invoke(self, input): + self.state, output = self.execute_fn(self.state, input) + return output + + def do_INVOKE(self, input, caller): + self.send([caller], 'INVOKED', output=self.invoke(input)) + + +def sequence_generator(state, input): + return state + input, range(state, state + input) + + +if __name__ == "__main__": + logging.basicConfig( + format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG) + member = Member(sequence_generator) + print member.address + member.start(initial_value=0) + +# tests + +import unittest +import threading + + +class FakeClient(Node): + + def do_INVOKED(self, output): + self.output = output + self.stop() + + +class MemberTests(unittest.TestCase): + + def test_invoke(self): + member = Member(sequence_generator) + client = FakeClient() + client.member = member + memberthd = threading.Thread(target=member.start, args=(0,)) + memberthd.daemon = 1 + memberthd.start() + client.send([member.address], 'INVOKE', input=5, caller=client.address) + client.run() + self.assertEqual(client.output, [0, 1, 2, 3, 4]) diff --git a/cluster/network.py b/cluster/network.py new file mode 100644 index 000000000..499a03ba5 --- /dev/null +++ b/cluster/network.py @@ -0,0 +1,110 @@ +import cPickle as pickle +import logging +import time +import heapq +import socket + + +def addr_to_tuple(addr): + parts = addr.split('-') + return parts[0], int(parts[1]) + + +def tuple_to_addr(addr): + if addr[0] == '0.0.0.0': + addr = socket.gethostbyname(socket.gethostname()), addr[1] + return '%s-%s' % addr + + +class Node(object): + + def __init__(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.bind(('', 0)) + self.address = tuple_to_addr(self.sock.getsockname()) + self.timers = [] + self.logger = logging.getLogger('node.%s' % (self.address,)) + + def run(self): + self.running = True + while self.running: + if self.timers: + next_timer = self.timers[0][0] + if next_timer < time.time(): + when, do, method = heapq.heappop(self.timers) + if do: + self.logger.debug("timeout %r" % method) + getattr(self, 'timeout_%s' % method)() + continue + else: + next_timer = time.time() + 1 + self.sock.settimeout(next_timer - time.time()) + try: + msg, address = self.sock.recvfrom(102400) + except socket.timeout: + continue + action, kwargs = pickle.loads(msg) + self.logger.debug("received %r with args %r" % (action, kwargs)) + getattr(self, 'do_%s' % action)(**kwargs) + + def stop(self): + self.running = False + + def set_timer(self, seconds, method): + timer = [time.time() + seconds, True, method] + heapq.heappush(self.timers, timer) + return timer + + def cancel_timer(self, timer): + timer[1] = False + + def send(self, destinations, action, **kwargs): + self.logger.debug("sending %s with args %s to %s" % + (action, kwargs, destinations)) + pkl = pickle.dumps((action, kwargs)) + for dest in destinations: + self.sock.sendto(pkl, addr_to_tuple(dest)) + +# tests + +import unittest +import threading + + +class TestNode(Node): + foo_called = False + bar_called = False + + def do_FOO(self, x, y): + self.foo_called = True + self.stop() + + def timeout_BAR(self): + self.bar_called = True + self.stop() + + +class NodeTests(unittest.TestCase): + + def test_comm(self): + sender = Node() + receiver = TestNode() + rxthread = threading.Thread(target=receiver.run) + rxthread.start() + sender.send([receiver.address], 'FOO', x=10, y=20) + rxthread.join() + self.failUnless(receiver.foo_called) + + def test_timeout(self): + node = TestNode() + node.set_timer(0.01, 'BAR') + node.run() + self.failUnless(node.bar_called) + + def test_cancel_timeout(self): + node = TestNode() + nonex = node.set_timer(0.01, 'NONEXISTENT') + node.set_timer(0.02, 'BAR') + node.cancel_timer(nonex) + node.run() + # this just needs to not crash diff --git a/cluster/requirements.txt b/cluster/requirements.txt new file mode 100644 index 000000000..d49742e03 --- /dev/null +++ b/cluster/requirements.txt @@ -0,0 +1 @@ +flufl.enum diff --git a/cluster/aa_statemachine.py b/cluster/statemachine.py similarity index 92% rename from cluster/aa_statemachine.py rename to cluster/statemachine.py index cc7c815c1..92df746e7 100644 --- a/cluster/aa_statemachine.py +++ b/cluster/statemachine.py @@ -21,7 +21,7 @@ def invoke(self, input): def sequence_generator(state, input): - return state+input, range(state, state+input) + return state + input, range(state, state + input) if __name__ == "__main__":