Skip to content

Commit

Permalink
Merge pull request aosabook#26 from djmitche/refactor
Browse files Browse the repository at this point in the history
Refactor into bite-sized pieces
  • Loading branch information
MichaelDiBernardo committed Feb 19, 2014
2 parents 083a2d6 + b2d8c86 commit cc4b7e7
Show file tree
Hide file tree
Showing 18 changed files with 807 additions and 581 deletions.
29 changes: 29 additions & 0 deletions cluster/acceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from collections import defaultdict
from protocol import Ballot
from member import Component


class Acceptor(Component):

def __init__(self, member):
super(Acceptor, self).__init__(member)
self.ballot_num = Ballot(-1, -1)
self.accepted = defaultdict() # { (b,s) : p }

def do_PREPARE(self, scout_id, ballot_num): # p1a
if ballot_num > self.ballot_num:
self.ballot_num = ballot_num
self.send([scout_id.address], 'PROMISE', # p1b
scout_id=scout_id,
acceptor=self.address,
ballot_num=self.ballot_num,
accepted=self.accepted)

def do_ACCEPT(self, commander_id, ballot_num, slot, proposal): # p2a
if ballot_num >= self.ballot_num:
self.ballot_num = ballot_num
self.accepted[(ballot_num, slot)] = proposal
self.send([commander_id.address], 'ACCEPTED', # p2b
commander_id=commander_id,
acceptor=self.address,
ballot_num=self.ballot_num)
25 changes: 25 additions & 0 deletions cluster/bootstrap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import protocol
from protocol import ViewChange
from member import Component


class Bootstrap(Component):

def __init__(self, member, peers, bootstrapped_cb):
super(Bootstrap, self).__init__(member)
self.peers = peers
self.bootstrapped_cb = bootstrapped_cb

def start(self):
self.join()

def join(self):
"Try to join the cluster"
self.peers = self.peers[1:] + self.peers[:1] # rotate through peers
self.send([self.peers[0]], 'JOIN', requester=self.address)
self.set_timer(protocol.JOIN_RETRANSMIT, self.join)

def do_WELCOME(self, state, slot_num, decisions, viewid, peers):
self.bootstrapped_cb(state, slot_num, decisions, viewid, peers)
self.event('view_change', viewchange=ViewChange(viewid, peers)) # TODO: pass viewid, peers separately
self.stop()
82 changes: 31 additions & 51 deletions cluster/client.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,50 @@
import sys
import logging
from deterministic_network import Node
from member import Member
from member import Component


class Client(Node):
# TODO: eliminate - this library doesn't have distinct client nodes
class Client(Member):

def __init__(self, members):
super(Client, self).__init__()
self.cid = 1000000
def __init__(self, node):
super(Client, self).__init__(node)
self.current_request = None

def start(self):
def re_invoke(n):
self.invoke(n, lambda output: re_invoke(n+1))
self.set_timer(1, lambda: re_invoke(1))
self.node.set_timer(1, lambda: re_invoke(1))

def invoke(self, input, callback):
def invoke(self, n, callback):
assert self.current_request is None
def done(output):
self.current_request = None
callback(output)
self.current_request = Request(self, n, done)
self.current_request.start()


class Request(Component):

client_ids = xrange(1000000, sys.maxint).__iter__()
RETRANSMIT_TIME = 0.1

def __init__(self, member, n, callback):
super(Request, self).__init__(member)
self.cid = self.client_ids.next()
self.n = n
self.output = None
self.current_request = (self.cid, input, callback)
self.cid += 1
self.send_invoke()
return self.output
self.callback = callback

def send_invoke(self):
cid, input, callback = self.current_request
nodes = [k for k in self.core.nodes.keys() if k.startswith('Node-')]
self.send([self.core.rnd.choice(nodes)], 'INVOKE',
caller=self.address, cid=cid, input=input)
self.invoke_timer = self.set_timer(3, self.send_invoke)
def start(self):
self.send([self.member.node.network.rnd.choice(self.member.node.network.nodes.keys())], 'INVOKE',
caller=self.address, cid=self.cid, input=self.n)
self.invoke_timer = self.set_timer(self.RETRANSMIT_TIME, self.start)

def do_INVOKED(self, cid, output):
if not self.current_request or cid != self.current_request[0]:
if cid != self.cid:
return
self.logger.debug("received output %r" % (output,))
callback = self.current_request[2]
self.current_request = None
self.cancel_timer(self.invoke_timer)
callback(output)

if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG)
client = Client(sys.argv[1])
print client.invoke(4)
print client.invoke(1)

# tests

import unittest
import threading


class FakeMember(Node):

def do_INVOKE(self, caller, cid, input):
self.send([caller], 'INVOKED', cid=cid, output=input * 10)
self.callback(output)
self.stop()


class ClientTests(unittest.TestCase):

def test_invoke(self):
member = FakeMember()
client = Client(member.address)
memberthd = threading.Thread(target=member.run)
memberthd.start()
self.assertEqual(client.invoke(5), 50)
memberthd.join()
41 changes: 41 additions & 0 deletions cluster/commander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from protocol import CommanderId
from member import Component


class Commander(Component):

def __init__(self, member, leader, ballot_num, slot, proposal, peers):
super(Commander, self).__init__(member)
self.leader = leader
self.ballot_num = ballot_num
self.slot = slot
self.proposal = proposal
self.commander_id = CommanderId(self.address, slot, proposal)
self.accepted = set([])
self.peers = peers
self.quorum = len(peers) / 2 + 1

def start(self):
self.send(self.peers, 'ACCEPT', # p2a
commander_id=self.commander_id,
ballot_num=self.ballot_num,
slot=self.slot,
proposal=self.proposal)

def finished(self, ballot_num, preempted):
self.leader.commander_finished(self.commander_id, ballot_num, preempted)
self.stop()

def do_ACCEPTED(self, commander_id, acceptor, ballot_num): # p2b
if commander_id != self.commander_id:
return
if ballot_num == self.ballot_num:
self.accepted.add(acceptor)
if len(self.accepted) < self.quorum:
return
self.send(self.peers, 'DECISION',
slot=self.slot,
proposal=self.proposal)
self.finished(ballot_num, False)
else:
self.finished(ballot_num, True)
67 changes: 37 additions & 30 deletions cluster/deterministic_network.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import uuid
import time
import logging
import heapq
Expand All @@ -8,38 +7,48 @@ class Node(object):

unique_ids = xrange(1000).__iter__()

def __init__(self):
def __init__(self, network):
self.network = network
self.unique_id = self.unique_ids.next()

def set_up_node(self, address, core):
self.core = core
self.address = address
self.core.nodes[self.address] = self
self.logger = logging.getLogger('node.%s' % (self.address,))
self.address = 'N%d' % self.unique_id
self.components = []
self.logger = logging.getLogger(self.address)
self.logger.info('starting')

def stop(self):
self.logger.error('STOPPING')
if self.address in self.core.nodes:
del self.core.nodes[self.address]

def start(self):
pass
def kill(self):
self.logger.error('node dying')
if self.address in self.network.nodes:
del self.network.nodes[self.address]

def set_timer(self, seconds, callable):
# TODO: refactor so this won't call a stopped node
return self.core.set_timer(seconds, self.address, callable)
return self.network.set_timer(seconds, self.address, callable)

def cancel_timer(self, timer):
self.core.cancel_timer(timer)
self.network.cancel_timer(timer)

def send(self, destinations, action, **kwargs):
self.logger.debug("sending %s with args %s to %s" %
(action, kwargs, destinations))
self.core.send(destinations, action, **kwargs)
self.network.send(destinations, action, **kwargs)

def register(self, component):
self.components.append(component)

def unregister(self, component):
self.components.remove(component)

def receive(self, action, kwargs):
import sys
for comp in self.components[:]:
try:
fn = getattr(comp, 'do_%s' % action)
except AttributeError:
continue
comp.logger.debug("received %r with args %r" % (action, kwargs))
fn(**kwargs)

class Core(object):

class Network(object):

PROP_DELAY = 0.03
PROP_JITTER = 0.02
Expand All @@ -50,11 +59,14 @@ def __init__(self, seed, pause=False):
self.pause = pause
self.timers = []
self.now = 1000.0
self.logger = logging.getLogger('core')
self.logger = logging.getLogger('network')

def new_node(self):
node = Node(self)
self.nodes[node.address] = node
return node

def run(self):
for node in sorted(self.nodes.values()):
node.start()
while self.timers:
next_timer = self.timers[0][0]
if next_timer > self.now:
Expand All @@ -71,6 +83,7 @@ def stop(self):
self.timers = []

def set_timer(self, seconds, address, callable):
# TODO: return an obj with 'cancel'
timer = [self.now + seconds, True, address, callable]
heapq.heappush(self.timers, timer)
return timer
Expand All @@ -83,13 +96,7 @@ def _receive(self, address, action, kwargs):
node = self.nodes[address]
except KeyError:
return
try:
fn = getattr(node, 'do_%s' % action)
except AttributeError:
return

node.logger.debug("received %r with args %r" % (action, kwargs))
fn(**kwargs)
node.receive(action, kwargs)

def send(self, destinations, action, **kwargs):
for dest in destinations:
Expand Down
26 changes: 26 additions & 0 deletions cluster/fake_network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
class FakeNode(object):

def __init__(self):
self.unique_id = 999
self.address = 'F999'
self.component = None
self.timers = []
self.sent = []

def register(self, component):
assert not self.component
self.component = component

def set_timer(self, seconds, callable):
self.timers.append([seconds, callable, True])
return self.timers[-1]

def cancel_timer(self, timer):
timer[2] = False

def send(self, destinations, action, **kwargs):
self.sent.append((destinations, action, kwargs))

def fake_message(self, action, **kwargs):
fn = getattr(self.component, 'do_%s' % action)
fn(**kwargs)
36 changes: 36 additions & 0 deletions cluster/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import protocol
from member import Component


class Heartbeat(Component):

def __init__(self, member, clock):
super(Heartbeat, self).__init__(member)
self.running = False
self.last_heard_from = {}
self.peers = None
self.clock = clock

def on_view_change_event(self, viewchange):
self.peers = set(viewchange.peers)
for peer in self.peers:
self.last_heard_from[peer] = self.clock()
if not self.running:
self.heartbeat()
self.running = True

def do_HEARTBEAT(self, sender):
self.last_heard_from[sender] = self.clock()

def heartbeat(self):
# send heartbeats to other nodes
self.send(self.last_heard_from.keys(), 'HEARTBEAT', sender=self.address)

# determine if any peers are down, and notify if so; note that this
# notification will occur repeatedly until a view change
too_old = self.clock() - 2 * protocol.HEARTBEAT_INTERVAL
active_peers = set(p for p in self.last_heard_from if self.last_heard_from[p] >= too_old)
if active_peers != self.peers:
self.event('peers_down', down=self.peers - active_peers)

self.set_timer(protocol.HEARTBEAT_INTERVAL, self.heartbeat)
Loading

0 comments on commit cc4b7e7

Please sign in to comment.