forked from aosabook/500lines
-
Notifications
You must be signed in to change notification settings - Fork 0
/
leader.py
118 lines (104 loc) · 4.91 KB
/
leader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from . import Ballot, ALPHA, CommanderId, view_primary
from .commander import Commander
from .member import Component
from .scout import Scout
class Leader(Component):
def __init__(self, member, unique_id, peer_history, commander_cls=Commander, scout_cls=Scout):
super(Leader, self).__init__(member)
self.ballot_num = Ballot(-1, 0, unique_id)
self.active = False
self.proposals = {}
self.commander_cls = commander_cls
self.commanders = {}
self.scout_cls = scout_cls
self.scout = None
self.view_id = -1
self.peers = None
self.peer_history = peer_history
def on_update_peer_history_event(self, peer_history):
self.peer_history = peer_history
def on_view_change_event(self, slot, view_id, peers):
self.view_id = view_id
self.peers = peers
# we are not an active leader in this new view
if self.scout:
self.scout.finished(None, None) # eventually calls preempted
elif self.active:
self.preempted(None)
elif view_primary(view_id, peers) == self.address:
self.spawn_scout()
def spawn_scout(self):
assert not self.scout
self.ballot_num = Ballot(
self.view_id, self.ballot_num.n, self.ballot_num.leader)
sct = self.scout = self.scout_cls(
self.member, self, self.ballot_num, self.peers)
sct.start()
# TODO: rename pvals to something with semantic meaning
def scout_finished(self, adopted, ballot_num, pvals):
self.scout = None
if adopted:
# pvals is a defaultdict of proposal by (ballot num, slot); we need the proposal with
# highest ballot number for each slot.
# This *will* work since proposals with lower ballot numbers will be overwritten
# by proposals with higher ballot numbers. It is guaranteed since
# we sorting pvals items in ascending order.
last_by_slot = {s: p for (b, s), p in sorted(pvals.items())}
for slot_id, proposal in last_by_slot.iteritems():
self.proposals[slot_id] = proposal
# re-spawn commanders for any potentially outstanding proposals
for view_slot in sorted(self.peer_history):
slot = view_slot + ALPHA
if slot in self.proposals:
self.spawn_commander(self.ballot_num, slot, self.proposals[slot])
# note that we don't re-spawn commanders here; if there are undecided
# proposals, the replicas will re-propose
self.logger.info("leader becoming active")
self.active = True
else:
self.preempted(ballot_num)
def preempted(self, ballot_num):
# ballot_num is None when we are preempted by a view change
if ballot_num:
self.logger.info("leader preempted by %s" % (ballot_num.leader,))
else:
self.logger.info("leader preempted by view change")
self.active = False
self.ballot_num = Ballot(
self.view_id, (ballot_num or self.ballot_num).n + 1, self.ballot_num.leader)
# if we're the primary for this view, re-scout immediately
if not self.scout and view_primary(self.view_id, self.peers) == self.address:
self.logger.info("re-scouting as the primary for this view")
self.spawn_scout()
def spawn_commander(self, ballot_num, slot, proposal):
peers = self.peer_history[slot - ALPHA]
commander_id = CommanderId(self.address, slot, self.proposals[slot])
if commander_id in self.commanders:
return
cmd = self.commander_cls(
self.member, self, ballot_num, slot, proposal, commander_id, peers)
self.commanders[commander_id] = cmd
cmd.start()
def commander_finished(self, commander_id, ballot_num, preempted):
del self.commanders[commander_id]
if preempted:
self.preempted(ballot_num)
def do_PROPOSE(self, slot, proposal):
if slot not in self.proposals:
if self.active:
# find the peers ALPHA slots ago, or ignore if unknown
if slot - ALPHA not in self.peer_history:
self.logger.info("slot %d not in peer history %r" %
(slot - ALPHA, sorted(self.peer_history)))
return
self.proposals[slot] = proposal
self.logger.info("spawning commander for slot %d" % (slot,))
self.spawn_commander(self.ballot_num, slot, proposal)
else:
if not self.scout:
self.logger.info("got PROPOSE when not active - scouting")
self.spawn_scout()
else:
self.logger.info("got PROPOSE while scouting; ignored")
else:
self.logger.info("got PROPOSE for a slot already being proposed")