Skip to content

Commit dc71e1d

Browse files
BLE optimizations (#117)
* Updates leader election. Also adds std::time::Duration parsing to TestConfig TOML. * Non-gossip leader election * Cleaned up code * Ensures stale leaders become followers in Paxos * Restructure code and send BleState to OmniPaxos. * Corrected leader happiness definition. Solution for HBRequest happiness. * Removed need for ballots in HeartbeatRequests. * Fixes, recovering leader edge case. Adds comments. * Remove need for PendingLeader by syncing with Paxos promise, * Addresses review comments * Removes StaleLeader signaling. Fixes BLE-Paxos leader sync. * Adds NotAccepted message. * Stops leader from sending prepares to followers promised with a greater ballot. * Rework promises_meta to Vec<PromiseState> * Removed reset_promises() * Refines follower ballot checking. Removes leader_state.decided_indexes. * Addresses comments. * Add comment for previous_replies in ble. --------- Co-authored-by: Harald Ng <harald.ng@hotmail.com>
1 parent d9ab41c commit dc71e1d

25 files changed

+508
-453
lines changed

omnipaxos/src/ballot_leader_election.rs

Lines changed: 106 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::cmp::Ordering;
22

33
/// Ballot Leader Election algorithm for electing new leaders
4-
use crate::util::{defaults::*, ConfigurationId, FlexibleQuorum, Quorum};
4+
use crate::{
5+
sequence_paxos::{Phase, Role},
6+
util::{defaults::*, ConfigurationId, FlexibleQuorum, Quorum},
7+
};
58

69
#[cfg(feature = "logging")]
710
use crate::utils::logger::create_logger;
@@ -15,7 +18,7 @@ use crate::{
1518
#[cfg(feature = "serde")]
1619
use serde::{Deserialize, Serialize};
1720
#[cfg(feature = "logging")]
18-
use slog::{debug, info, trace, warn, Logger};
21+
use slog::{info, trace, warn, Logger};
1922

2023
/// Used to define a Sequence Paxos epoch
2124
#[derive(Clone, Copy, Eq, Debug, Default, PartialEq)]
@@ -59,8 +62,8 @@ impl PartialOrd for Ballot {
5962
}
6063
}
6164

62-
/// The connectivity of an OmniPaxos node
63-
pub(crate) type Connectivity = u8;
65+
const INITIAL_ROUND: u32 = 1;
66+
const RECOVERY_ROUND: u32 = 0;
6467

6568
/// A Ballot Leader Election component. Used in conjunction with OmniPaxos to handle the election of a leader for a cluster of OmniPaxos servers,
6669
/// incoming messages and produces outgoing messages that the user has to fetch periodically and send using a network implementation.
@@ -74,18 +77,18 @@ pub(crate) struct BallotLeaderElection {
7477
peers: Vec<NodeId>,
7578
/// The current round of the heartbeat cycle.
7679
hb_round: u32,
77-
/// Vector which temporarily holds all the received heartbeats from one heartbeat round, including the current node.
78-
ballots: Vec<(Ballot, Connectivity)>,
79-
/// Vector that holds all the received heartbeats from the previous heartbeat round, including the current node.
80+
/// The heartbeat replies this instance received during the current round.
81+
heartbeat_replies: Vec<HeartbeatReply>,
82+
/// Vector that holds all the received heartbeats from the previous heartbeat round, including the current node. Only used to display the connectivity of this node in the UI.
8083
/// Represents nodes that are currently alive from the view of the current node.
81-
prev_round_ballots: Vec<(Ballot, Connectivity)>,
84+
prev_replies: Vec<HeartbeatReply>,
8285
/// Holds the current ballot of this instance.
8386
current_ballot: Ballot,
84-
/// The number of replicas inside the cluster that this instance is
85-
/// connected to (based on heartbeats received) including itself.
86-
connectivity: Connectivity,
87-
/// Current elected leader.
88-
leader: Option<Ballot>,
87+
/// The current leader of this instance.
88+
leader: Ballot,
89+
/// A happy node either sees that it is, is connected to, or sees evidence of a potential leader
90+
/// for the cluster. If a node is unhappy then it is seeking a new leader.
91+
happy: bool,
8992
/// The number of replicas inside the cluster whose heartbeats are needed to become and remain the leader.
9093
quorum: Quorum,
9194
/// Vector which holds all the outgoing messages of the BLE instance.
@@ -97,23 +100,31 @@ pub(crate) struct BallotLeaderElection {
97100

98101
impl BallotLeaderElection {
99102
/// Construct a new BallotLeaderElection node
100-
pub(crate) fn with(config: BLEConfig, initial_leader: Option<Ballot>) -> Self {
103+
pub(crate) fn with(config: BLEConfig, recovered_leader: Option<Ballot>) -> Self {
101104
let config_id = config.configuration_id;
102105
let pid = config.pid;
103106
let peers = config.peers;
104107
let num_nodes = &peers.len() + 1;
105108
let quorum = Quorum::with(config.flexible_quorum, num_nodes);
106-
let initial_ballot = Ballot::with(config_id, 0, config.priority, pid);
109+
let mut initial_ballot = Ballot::with(config_id, INITIAL_ROUND, config.priority, pid);
110+
let initial_leader = match recovered_leader {
111+
Some(b) if b != Ballot::default() => {
112+
// Prevents a recovered server from retaining BLE leadership with the same ballot.
113+
initial_ballot.n = RECOVERY_ROUND;
114+
b
115+
}
116+
_ => initial_ballot,
117+
};
107118
let mut ble = BallotLeaderElection {
108119
configuration_id: config_id,
109120
pid,
110121
peers,
111122
hb_round: 0,
112-
ballots: Vec::with_capacity(num_nodes),
113-
prev_round_ballots: Vec::with_capacity(num_nodes),
123+
heartbeat_replies: Vec::with_capacity(num_nodes),
124+
prev_replies: Vec::with_capacity(num_nodes),
114125
current_ballot: initial_ballot,
115-
connectivity: num_nodes as Connectivity,
116126
leader: initial_leader,
127+
happy: true,
117128
quorum,
118129
outgoing: Vec::with_capacity(config.buffer_size),
119130
#[cfg(feature = "logging")]
@@ -160,70 +171,20 @@ impl BallotLeaderElection {
160171
}
161172
}
162173

163-
fn check_leader(&mut self) -> Option<Ballot> {
164-
let ballots = std::mem::take(&mut self.ballots);
165-
let top_accept_ballot = ballots
166-
.iter()
167-
.filter_map(|&(ballot, connectivity)| {
168-
if self.quorum.is_accept_quorum(connectivity as usize) {
169-
Some(ballot)
170-
} else {
171-
None
172-
}
173-
})
174-
.max()
175-
.unwrap_or_default();
176-
let leader_ballot = self.leader.unwrap_or_default();
177-
if top_accept_ballot == leader_ballot {
178-
// leader is still alive and has accept quorum
179-
None
180-
} else {
181-
// leader is dead || changed priority || doesn't have an accept quorum
182-
let top_prepare_ballot = ballots
183-
.iter()
184-
.filter_map(|&(ballot, connectivity)| {
185-
if self.quorum.is_prepare_quorum(connectivity as usize) {
186-
Some(ballot)
187-
} else {
188-
None
189-
}
190-
})
191-
.max()
192-
.unwrap_or_default();
193-
if top_prepare_ballot > leader_ballot {
194-
// new leader with prepare quorum
195-
let new_leader = top_prepare_ballot;
196-
self.leader = Some(new_leader);
197-
#[cfg(feature = "logging")]
198-
debug!(
199-
self.logger,
200-
"BLE {}, New Leader elected: {:?}", self.pid, new_leader
201-
);
202-
Some(new_leader)
203-
} else {
204-
// nobody has taken over leadership, let's try to ourselves
205-
self.current_ballot.n = leader_ballot.n + 1;
206-
self.leader = None;
207-
None
208-
}
209-
}
210-
}
211-
212174
/// Initiates a new heartbeat round.
213175
pub(crate) fn new_hb_round(&mut self) {
176+
self.prev_replies = std::mem::take(&mut self.heartbeat_replies);
214177
self.hb_round += 1;
215178
#[cfg(feature = "logging")]
216179
trace!(
217180
self.logger,
218181
"Initiate new heartbeat round: {}",
219182
self.hb_round
220183
);
221-
222184
for peer in &self.peers {
223185
let hb_request = HeartbeatRequest {
224186
round: self.hb_round,
225187
};
226-
227188
self.outgoing.push(BLEMessage {
228189
from: self.pid,
229190
to: *peer,
@@ -232,41 +193,89 @@ impl BallotLeaderElection {
232193
}
233194
}
234195

235-
pub(crate) fn hb_timeout(&mut self) -> Option<Ballot> {
236-
let my_connectivity = self.ballots.len() + 1;
237-
self.connectivity = my_connectivity as Connectivity;
238-
// Add our own ballot to the list of received ballots of current hb round
239-
self.ballots.push((self.current_ballot, self.connectivity));
240-
self.prev_round_ballots = self.ballots.clone();
241-
let result: Option<Ballot> = if self.quorum.is_prepare_quorum(my_connectivity) {
242-
#[cfg(feature = "logging")]
243-
debug!(
244-
self.logger,
245-
"Received a majority of heartbeats, round: {}, {:?}", self.hb_round, self.ballots
246-
);
247-
self.check_leader()
196+
/// End of a heartbeat round. Returns current leader and election status.
197+
pub(crate) fn hb_timeout(
198+
&mut self,
199+
seq_paxos_state: &(Role, Phase),
200+
seq_paxos_promise: Ballot,
201+
) -> Option<Ballot> {
202+
self.update_leader();
203+
self.update_happiness(seq_paxos_state);
204+
self.check_takeover();
205+
self.new_hb_round();
206+
if seq_paxos_promise > self.leader {
207+
// Sync leader with Paxos promise in case ballot didn't make it to BLE followers
208+
self.leader = seq_paxos_promise;
209+
self.happy = true;
210+
}
211+
if self.leader == self.current_ballot {
212+
Some(self.current_ballot)
248213
} else {
249-
#[cfg(feature = "logging")]
250-
warn!(
251-
self.logger,
252-
"Did not receive a majority of heartbeats, round: {}, {:?}",
253-
self.hb_round,
254-
self.ballots
255-
);
256-
self.ballots.clear();
257214
None
215+
}
216+
}
217+
218+
fn update_leader(&mut self) {
219+
let max_reply_ballot = self.heartbeat_replies.iter().map(|r| r.ballot).max();
220+
if let Some(max) = max_reply_ballot {
221+
if max > self.leader {
222+
self.leader = max;
223+
}
224+
}
225+
}
226+
227+
fn update_happiness(&mut self, seq_paxos_state: &(Role, Phase)) {
228+
self.happy = if self.leader == self.current_ballot {
229+
let potential_followers = self
230+
.heartbeat_replies
231+
.iter()
232+
.filter(|hb_reply| hb_reply.leader <= self.current_ballot)
233+
.count();
234+
let can_form_quorum = match seq_paxos_state {
235+
(Role::Leader, Phase::Accept) => {
236+
self.quorum.is_accept_quorum(potential_followers + 1)
237+
}
238+
_ => self.quorum.is_prepare_quorum(potential_followers + 1),
239+
};
240+
if can_form_quorum {
241+
true
242+
} else {
243+
let see_larger_happy_leader = self
244+
.heartbeat_replies
245+
.iter()
246+
.any(|r| r.leader > self.current_ballot && r.happy);
247+
see_larger_happy_leader
248+
}
249+
} else {
250+
self.heartbeat_replies
251+
.iter()
252+
.any(|r| r.ballot == self.leader && r.happy)
258253
};
259-
self.new_hb_round();
260-
result
254+
}
255+
256+
fn check_takeover(&mut self) {
257+
if !self.happy {
258+
let all_neighbors_unhappy = self.heartbeat_replies.iter().all(|r| !r.happy);
259+
let im_quorum_connected = self
260+
.quorum
261+
.is_prepare_quorum(self.heartbeat_replies.len() + 1);
262+
if all_neighbors_unhappy && im_quorum_connected {
263+
// We increment past our leader instead of max of unhappy ballots because we
264+
// assume we have already checked leader for this round so they should be equal
265+
self.current_ballot.n = self.leader.n + 1;
266+
self.leader = self.current_ballot;
267+
self.happy = true;
268+
}
269+
}
261270
}
262271

263272
fn handle_request(&mut self, from: NodeId, req: HeartbeatRequest) {
264273
let hb_reply = HeartbeatReply {
265274
round: req.round,
266275
ballot: self.current_ballot,
267-
connectivity: self.connectivity,
276+
leader: self.leader,
277+
happy: self.happy,
268278
};
269-
270279
self.outgoing.push(BLEMessage {
271280
from: self.pid,
272281
to: from,
@@ -276,7 +285,7 @@ impl BallotLeaderElection {
276285

277286
fn handle_reply(&mut self, rep: HeartbeatReply) {
278287
if rep.round == self.hb_round && rep.ballot.config_id == self.configuration_id {
279-
self.ballots.push((rep.ballot, rep.connectivity));
288+
self.heartbeat_replies.push(rep);
280289
} else {
281290
#[cfg(feature = "logging")]
282291
warn!(
@@ -290,8 +299,8 @@ impl BallotLeaderElection {
290299
self.current_ballot
291300
}
292301

293-
pub(crate) fn get_ballots(&self) -> Vec<(Ballot, Connectivity)> {
294-
self.prev_round_ballots.clone()
302+
pub(crate) fn get_ballots(&self) -> Vec<HeartbeatReply> {
303+
self.prev_replies.clone()
295304
}
296305
}
297306

omnipaxos/src/messages.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ pub mod sequence_paxos {
1717
use serde::{Deserialize, Serialize};
1818
use std::fmt::Debug;
1919

20+
/// Message sent by a follower on crash-recovery or dropped messages to request its leader to re-prepare them.
21+
#[derive(Copy, Clone, Debug)]
22+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
23+
pub struct PrepareReq {
24+
/// The current round.
25+
pub n: Ballot,
26+
}
27+
2028
/// Prepare message sent by a newly-elected leader to initiate the Prepare phase.
2129
#[derive(Copy, Clone, Debug)]
2230
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
@@ -128,6 +136,15 @@ pub mod sequence_paxos {
128136
pub ss: StopSign,
129137
}
130138

139+
/// Message sent by follower to leader when accepting an entry is rejected.
140+
/// This happens when the follower is promised to a greater leader.
141+
#[derive(Clone, Debug)]
142+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
143+
pub struct NotAccepted {
144+
/// The follower's current ballot
145+
pub n: Ballot,
146+
}
147+
131148
/// Compaction Request
132149
#[allow(missing_docs)]
133150
#[derive(Clone, Debug)]
@@ -146,13 +163,14 @@ pub mod sequence_paxos {
146163
T: Entry,
147164
{
148165
/// Request a [`Prepare`] to be sent from the leader. Used for fail-recovery.
149-
PrepareReq,
166+
PrepareReq(PrepareReq),
150167
#[allow(missing_docs)]
151168
Prepare(Prepare),
152169
Promise(Promise<T>),
153170
AcceptSync(AcceptSync<T>),
154171
AcceptDecide(AcceptDecide<T>),
155172
Accepted(Accepted),
173+
NotAccepted(NotAccepted),
156174
Decide(Decide),
157175
/// Forward client proposals to the leader.
158176
ProposalForward(Vec<T>),
@@ -207,10 +225,12 @@ pub mod ballot_leader_election {
207225
pub struct HeartbeatReply {
208226
/// Number of the current heartbeat round.
209227
pub round: u32,
210-
/// Ballot of a server.
228+
/// Ballot of replying server.
211229
pub ballot: Ballot,
212-
/// The number of replicas inside the cluster the sender is connected to (including itself)
213-
pub connectivity: u8,
230+
/// Leader this server is following
231+
pub leader: Ballot,
232+
/// Whether the replying server sees a need for a new leader
233+
pub happy: bool,
214234
}
215235

216236
/// A struct for a Paxos message that also includes sender and receiver.

omnipaxos/src/omni_paxos.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,21 +391,24 @@ where
391391
/// It is also used for the election process, where the server checks if it can become the leader.
392392
/// For instance if `election_timeout()` is called every 100ms, then if the leader fails, the servers will detect it after 100ms and elect a new server after another 100ms if possible.
393393
fn election_timeout(&mut self) {
394-
if let Some(b) = self.ble.hb_timeout() {
395-
self.seq_paxos.handle_leader(b);
394+
if let Some(new_leader) = self
395+
.ble
396+
.hb_timeout(self.seq_paxos.get_state(), self.seq_paxos.get_promise())
397+
{
398+
self.seq_paxos.handle_leader(new_leader);
396399
}
397400
}
398401

399402
/// Returns the current states of the OmniPaxos instance for OmniPaxos UI to display.
400403
pub fn get_ui_states(&self) -> ui::OmniPaxosStates {
401404
let mut cluster_state = ClusterState::from(self.seq_paxos.get_leader_state());
402-
cluster_state.ballots = self.ble.get_ballots();
405+
cluster_state.heartbeats = self.ble.get_ballots();
403406

404407
ui::OmniPaxosStates {
405408
current_ballot: self.ble.get_current_ballot(),
406409
current_leader: self.get_current_leader(),
407410
decided_idx: self.get_decided_idx(),
408-
ballots: self.ble.get_ballots(),
411+
heartbeats: self.ble.get_ballots(),
409412
cluster_state,
410413
}
411414
}

0 commit comments

Comments
 (0)