Skip to content

Commit

Permalink
Merge branch 'master' of github.com:aosabook/500lines
Browse files Browse the repository at this point in the history
Conflicts:
	README.md
  • Loading branch information
haz committed Mar 12, 2014
2 parents 578f2d9 + eb43837 commit 9e5298d
Show file tree
Hide file tree
Showing 20 changed files with 971 additions and 95 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ Contributors
<td>@jaaaarel</td>
<td>taavi.burns@points.com</td>
</tr>
<tr>
<td>Guido van Rossum</td>
<td>Dropbox</td>
<td>crawler</td>
<td>@gvanrossum</td>
<td>@gvanrossum</td>
<td>guido@python.org</td>
</tr>
<tr>
<td>Christian Muise</td>
<td>University of Melbourne</td>
Expand Down
2 changes: 1 addition & 1 deletion cluster/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Acceptor(Component):

def __init__(self, member):
super(Acceptor, self).__init__(member)
self.ballot_num = Ballot(-1, -1)
self.ballot_num = Ballot(-1, -1, -1)
self.accepted = defaultdict() # { (b,s) : p }

def do_PREPARE(self, scout_id, ballot_num): # p1a
Expand Down
13 changes: 8 additions & 5 deletions cluster/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import protocol
from protocol import ViewChange
from member import Component


Expand All @@ -8,6 +7,7 @@ class Bootstrap(Component):
def __init__(self, member, peers, bootstrapped_cb):
super(Bootstrap, self).__init__(member)
self.peers = peers
self.timer = None
self.bootstrapped_cb = bootstrapped_cb

def start(self):
Expand All @@ -17,9 +17,12 @@ def join(self):
"Try to join the cluster"
self.peers = self.peers[1:] + self.peers[:1] # rotate through peers
self.send([self.peers[0]], 'JOIN', requester=self.address)
self.set_timer(protocol.JOIN_RETRANSMIT, self.join)
self.timer = self.set_timer(protocol.JOIN_RETRANSMIT, self.join)

def do_WELCOME(self, state, slot_num, decisions, viewid, peers):
self.bootstrapped_cb(state, slot_num, decisions, viewid, peers)
self.event('view_change', viewchange=ViewChange(viewid, peers)) # TODO: pass viewid, peers separately
def do_WELCOME(self, state, slot_num, decisions, viewid, peers, peer_history):
self.bootstrapped_cb(state, slot_num, decisions, viewid, peers, peer_history)
self.event('view_change', viewid=viewid, peers=peers, slot=slot_num)
self.event('peer_history_update', peer_history=peer_history)
if self.timer:
self.cancel_timer(self.timer)
self.stop()
2 changes: 1 addition & 1 deletion cluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def done(output):
class Request(Component):

client_ids = xrange(1000000, sys.maxint).__iter__()
RETRANSMIT_TIME = 0.1
RETRANSMIT_TIME = 0.5

def __init__(self, member, n, callback):
super(Request, self).__init__(member)
Expand Down
20 changes: 14 additions & 6 deletions cluster/commander.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
from protocol import CommanderId
import protocol
from member import Component


class Commander(Component):

def __init__(self, member, leader, ballot_num, slot, proposal, peers):
def __init__(self, member, leader, ballot_num, slot, proposal, commander_id, peers):
super(Commander, self).__init__(member)
self.leader = leader
self.ballot_num = ballot_num
self.slot = slot
self.proposal = proposal
self.commander_id = CommanderId(self.address, slot, proposal)
self.commander_id = commander_id
self.accepted = set([])
self.peers = peers
self.quorum = len(peers) / 2 + 1
self.timer = None

def start(self):
self.send(self.peers, 'ACCEPT', # p2a
self.send(set(self.peers) - self.accepted, 'ACCEPT', # p2a
commander_id=self.commander_id,
ballot_num=self.ballot_num,
slot=self.slot,
proposal=self.proposal)
self.timer = self.set_timer(protocol.ACCEPT_RETRANSMIT, self.start)

def finished(self, ballot_num, preempted):
self.leader.commander_finished(self.commander_id, ballot_num, preempted)
if self.timer:
self.cancel_timer(self.timer)
self.stop()

def do_ACCEPTED(self, commander_id, acceptor, ballot_num): # p2b
Expand All @@ -33,9 +37,13 @@ def do_ACCEPTED(self, commander_id, acceptor, ballot_num): # p2b
self.accepted.add(acceptor)
if len(self.accepted) < self.quorum:
return
# make sure that this node hears about the decision, otherwise the
# slot can get "stuck" if all of the DECISION messages get lost, or
# if this node is not in self.peers
self.event('decision', slot=self.slot, proposal=self.proposal)
self.send(self.peers, 'DECISION',
slot=self.slot,
proposal=self.proposal)
slot=self.slot,
proposal=self.proposal)
self.finished(ballot_num, False)
else:
self.finished(ballot_num, True)
8 changes: 6 additions & 2 deletions cluster/deterministic_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import heapq
import random
import copy

class Node(object):

Expand Down Expand Up @@ -38,7 +39,6 @@ def unregister(self, component):
self.components.remove(component)

def receive(self, action, kwargs):
import sys
for comp in self.components[:]:
try:
fn = getattr(comp, 'do_%s' % action)
Expand All @@ -52,6 +52,7 @@ class Network(object):

PROP_DELAY = 0.03
PROP_JITTER = 0.02
DROP_PROB = 0.05

def __init__(self, seed, pause=False):
self.nodes = {}
Expand Down Expand Up @@ -101,5 +102,8 @@ def _receive(self, address, action, kwargs):
def send(self, destinations, action, **kwargs):
for dest in destinations:
delay = self.PROP_DELAY + self.rnd.uniform(-self.PROP_JITTER, self.PROP_JITTER)
self.set_timer(delay, dest, lambda dest=dest: self._receive(dest, action, kwargs))
if self.rnd.uniform(0, 1.0) > self.DROP_PROB:
# copy the kwargs now, before the sender modifies them
self.set_timer(delay, dest, lambda dest=dest, kwargs=copy.deepcopy(kwargs):
self._receive(dest, action, kwargs))

6 changes: 3 additions & 3 deletions cluster/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def __init__(self, member, clock):
self.peers = None
self.clock = clock

def on_view_change_event(self, viewchange):
self.peers = set(viewchange.peers)
def on_view_change_event(self, slot, viewid, peers):
self.peers = set(peers)
for peer in self.peers:
self.last_heard_from[peer] = self.clock()
if not self.running:
Expand All @@ -28,7 +28,7 @@ def heartbeat(self):

# determine if any peers are down, and notify if so; note that this
# notification will occur repeatedly until a view change
too_old = self.clock() - 2 * protocol.HEARTBEAT_INTERVAL
too_old = self.clock() - protocol.HEARTBEAT_GONE_COUNT * protocol.HEARTBEAT_INTERVAL
active_peers = set(p for p in self.last_heard_from if self.last_heard_from[p] >= too_old)
if active_peers != self.peers:
self.event('peers_down', down=self.peers - active_peers)
Expand Down
80 changes: 49 additions & 31 deletions cluster/leader.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,43 @@
from util import defaultlist, view_primary
from protocol import Ballot
from protocol import Ballot, ALPHA, CommanderId
from member import Component
from scout import Scout
from commander import Commander

class Leader(Component):

HEARTBEAT_INTERVAL = 1

def __init__(self, member, unique_id, commander_cls=Commander, scout_cls=Scout):
def __init__(self, member, unique_id, peer_history, commander_cls=Commander, scout_cls=Scout):
super(Leader, self).__init__(member)
self.ballot_num = Ballot(0, unique_id)
self.ballot_num = Ballot(-1, 0, unique_id)
self.active = False
self.proposals = defaultlist()
self.commander_cls = commander_cls
self.commanders = {}
self.scout_cls = scout_cls
self.scout = None
self.viewid = -1
self.peers = None
self.peer_history = peer_history

def on_view_change_event(self, viewchange):
self.peers = viewchange.peers
is_primary = view_primary(viewchange.viewid, viewchange.peers) == self.address
self.is_primary = is_primary
if is_primary:
if not self.scout and not self.active:
self.spawn_scout()
else:
if self.scout:
self.scout.finished(False, None)
# .. which eventually calls self.preempted
elif self.active:
self.preempted(None)
def on_update_peer_history_event(self, peer_history):
self.peer_history = peer_history

def on_view_change_event(self, slot, viewid, peers):
self.viewid = viewid
self.peers = peers
self.is_primary = view_primary(viewid, peers) == self.address

# we are not an active leader in this new view
if self.scout:
self.scout.finished(None, None) # eventually calls preempted
elif self.active:
self.preempted(None)
elif self.is_primary:
self.spawn_scout()

def spawn_scout(self):
assert not self.scout
self.ballot_num = Ballot(self.viewid, self.ballot_num.n, self.ballot_num.leader)
sct = self.scout = self.scout_cls(self.member, self, self.ballot_num, self.peers)
sct.start()

Expand All @@ -52,9 +55,14 @@ def scout_finished(self, adopted, ballot_num, pvals):
for s, p in enumerate(last_by_slot):
if p is not None:
self.proposals[s] = p
for s, p in enumerate(self.proposals):
if p is not None:
self.spawn_commander(ballot_num, s, p)
# re-spawn commanders for any potentially outstanding proposals
for view_slot in sorted(self.peer_history):
slot = view_slot + ALPHA
if self.proposals[slot] is not None:
self.spawn_commander(self.ballot_num, slot, self.proposals[slot],
self.peer_history[view_slot])
# note that we don't re-spawn commanders here; if there are undecided
# proposals, the replicas will re-propose
self.logger.info("leader becoming active")
self.active = True
else:
Expand All @@ -68,34 +76,44 @@ def preempted(self, ballot_num):
self.logger.info("leader preempted by view change")
self.active = False
self.ballot_num = Ballot(
(ballot_num if ballot_num else self.ballot_num).n + 1, self.ballot_num.leader)
self.viewid,
(ballot_num if ballot_num else self.ballot_num).n + 1,
self.ballot_num.leader)
# if we're the primary for this view, re-scout immediately
if not self.scout and self.is_primary:
self.logger.info("re-scouting as the primary for this view")
self.spawn_scout()

def spawn_commander(self, ballot_num, slot, proposal):
cmd = self.commander_cls(self.member, self, ballot_num, slot, proposal, self.peers)
if cmd.commander_id in self.commanders:
def spawn_commander(self, ballot_num, slot, proposal, peers):
peers = self.peer_history[slot - ALPHA]
commander_id = CommanderId(self.address, slot, self.proposals[slot])
if commander_id in self.commanders:
return
print "set", cmd.commander_id
self.commanders[cmd.commander_id] = cmd
cmd = self.commander_cls(self.member, self, ballot_num, slot, proposal, commander_id, peers)
self.commanders[commander_id] = cmd
cmd.start()

def commander_finished(self, commander_id, ballot_num, preempted):
print "del", commander_id
del self.commanders[commander_id]
if preempted:
self.preempted(ballot_num)

def do_PROPOSE(self, slot, proposal):
if self.proposals[slot] is None:
if self.active:
# find the peers ALPHA slots ago, or ignore if unknown
if slot - ALPHA not in self.peer_history:
self.logger.warning("slot %d not in peer history %r" % (slot - ALPHA, sorted(self.peer_history)))
return
self.proposals[slot] = proposal
self.spawn_commander(self.ballot_num, slot, proposal)
self.logger.warning("spawning commander for slot %d" % (slot,))
self.spawn_commander(self.ballot_num, slot, proposal, self.peer_history[slot - ALPHA])
else:
if not self.scout:
self.logger.warning(
"got PROPOSE when not active - scouting")
self.logger.warning("got PROPOSE when not active - scouting")
self.spawn_scout()
else:
self.logger.warning("got PROPOSE while scouting; ignored")
else:
self.logger.warning("got PROPOSE for a slot already being proposed")

6 changes: 3 additions & 3 deletions cluster/member_replicated.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ def __init__(self, node, execute_fn, peers,
bootstrap_cls=Bootstrap):
super(ClusterMember, self).__init__(node)
# only start the bootstrap component initially, then hand off to the rest
def bootstrapped(state, slot_num, decisions, viewid, peers):
def bootstrapped(state, slot_num, decisions, viewid, peers, peer_history):
self.replica = replica_cls(self, execute_fn)
self.acceptor = acceptor_cls(self)
self.leader = leader_cls(self, node.unique_id, commander_cls=commander_cls, scout_cls=scout_cls)
self.leader = leader_cls(self, node.unique_id, peer_history, commander_cls=commander_cls, scout_cls=scout_cls)
self.heartbeat = heartbeat_cls(self, lambda : node.network.now)
# start up the replica, now that its information is ready
self.replica.start(state, slot_num, decisions, viewid, peers)
self.replica.start(state, slot_num, decisions, viewid, peers, peer_history)
self.bootstrap = bootstrap_cls(self, peers, bootstrapped)

def start(self):
Expand Down
14 changes: 11 additions & 3 deletions cluster/protocol.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from collections import namedtuple

Proposal = namedtuple('Proposal', ['caller', 'cid', 'input'])
Ballot = namedtuple('Ballot', ['n', 'leader'])
Ballot = namedtuple('Ballot', ['viewid', 'n', 'leader'])
ScoutId = namedtuple('ScoutId', ['address', 'ballot_num'])
CommanderId = namedtuple('CommanderId', ['address', 'slot', 'proposal'])
ViewChange = namedtuple('ViewChange', ['viewid', 'peers'])

HEARTBEAT_INTERVAL = 0.5
JOIN_RETRANSMIT = 0.2
REPROPOSE_INTERVAL = 0.7
HEARTBEAT_GONE_COUNT = 3
JOIN_RETRANSMIT = 0.7
CATCHUP_INTERVAL = 0.6
ACCEPT_RETRANSMIT = 1
PREPARE_RETRANSMIT = 1
ALPHA = 10

# replicas should be able to re-propose a view change before the new node
# re-transmits the JOIN
assert CATCHUP_INTERVAL < JOIN_RETRANSMIT
Loading

0 comments on commit 9e5298d

Please sign in to comment.