Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Support for Turin attestations (#7499)
- verify_attestation script to fetch endorsements from AMD and check the provided attestation against them (#7499)
- PreVote optimistaion enabled. This requires that a follower checks that it could be elected before becoming a candidate. This optimisation improves the availablilty of Raft when there are omission faults like partial network partitions. (#7462)
- ProposeRequestVote on SIGTERM. When a primary, with `ignore_first_sigterm` receives the first SIGTERM, it nominates a successor, allowing the successor to skip waiting for the election timeout and call an election right away. (#7514)

### Changed

Expand Down
1 change: 1 addition & 0 deletions doc/architecture/consensus/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Supported extensions include:

- "CheckQuorum": the primary node automatically steps down, in the same view, if it does not hear back (via ``AppendEntriesResponse`` messages) from a majority of backups within a ``consensus.election_timeout`` period. This prevents an isolated primary node from still processing client write requests without being able to commit them.
- "NoTimeoutRetirement": a primary node that completes its retirement sends a ProposeRequestVote message to the most up-to-date node in the new configuration, causing that node to run for election without waiting for time out.
- A ProposeRequestVote message is also sent when a primary receives a termination signal. This reduces downtime when the orchestrator must suddenly retire the primary's host, but there is insufficient time to reconfigure the network first.
- "PreVote": followers must first request a pre-vote before starting a new election. This prevents followers from starting elections (and increasing the term) when they are isolated from the rest of the network.

Replica State Machine
Expand Down
120 changes: 74 additions & 46 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -2282,6 +2282,55 @@ namespace aft
}

private:
void send_propose_request_vote()
{
ProposeRequestVote prv{.term = state->current_view};

std::optional<ccf::NodeId> successor = std::nullopt;
Index max_match_idx = 0;
ccf::kv::ReconfigurationId reconf_id_of_max_match = 0;

// Pick the node that has the highest match_idx, and break
// ties by looking at the highest reconfiguration id they are
// part of. This can lead to nudging a node that is
// about to retire too, but that node will then nudge
// a successor, and that seems preferable to nudging a node that
// risks not being eligible if reconfiguration id is prioritised.
// Alternatively, we could pick the node with the highest match idx
// in the latest config, provided that match idx at least as high as a
// majority. That would make them both eligible and unlikely to retire
// soon.
for (auto& [node, node_state] : all_other_nodes)
{
if (node_state.match_idx >= max_match_idx)
{
ccf::kv::ReconfigurationId latest_reconf_id = 0;
auto conf = configurations.rbegin();
while (conf != configurations.rend())
{
if (conf->nodes.find(node) != conf->nodes.end())
{
latest_reconf_id = conf->idx;
break;
}
conf++;
}
if (!(node_state.match_idx == max_match_idx &&
latest_reconf_id < reconf_id_of_max_match))
{
reconf_id_of_max_match = latest_reconf_id;
successor = node;
max_match_idx = node_state.match_idx;
}
}
}
if (successor.has_value())
{
RAFT_INFO_FMT("Proposing that {} becomes candidate", successor.value());
channels->send_authenticated(
successor.value(), ccf::NodeMsgType::consensus_msg, prv);
}
}
void become_retired(Index idx, ccf::kv::RetirementPhase phase)
{
RAFT_INFO_FMT(
Expand Down Expand Up @@ -2319,52 +2368,7 @@ namespace aft
{
if (state->leadership_state == ccf::kv::LeadershipState::Leader)
{
ProposeRequestVote prv{.term = state->current_view};

std::optional<ccf::NodeId> successor = std::nullopt;
Index max_match_idx = 0;
ccf::kv::ReconfigurationId reconf_id_of_max_match = 0;

// Pick the node that has the highest match_idx, and break
// ties by looking at the highest reconfiguration id they are
// part of. This can lead to nudging a node that is
// about to retire too, but that node will then nudge
// a successor, and that seems preferable to nudging a node that
// risks not being eligible if reconfiguration id is prioritised.
// Alternatively, we could pick the node with the higest match idx
// in the latest config, provided that match idx at least as high as a
// majority. That would make them both eligible and unlikely to retire
// soon.
for (auto& [node, node_state] : all_other_nodes)
{
if (node_state.match_idx >= max_match_idx)
{
ccf::kv::ReconfigurationId latest_reconf_id = 0;
auto conf = configurations.rbegin();
while (conf != configurations.rend())
{
if (conf->nodes.find(node) != conf->nodes.end())
{
latest_reconf_id = conf->idx;
break;
}
conf++;
}
if (!(node_state.match_idx == max_match_idx &&
latest_reconf_id < reconf_id_of_max_match))
{
reconf_id_of_max_match = latest_reconf_id;
successor = node;
max_match_idx = node_state.match_idx;
}
}
}
if (successor.has_value())
{
RAFT_INFO_FMT("Node retired, nudging {}", successor.value());
channels->send_authenticated(
successor.value(), ccf::NodeMsgType::consensus_msg, prv);
}
send_propose_request_vote();
}

leader_id.reset();
Expand Down Expand Up @@ -2744,6 +2748,30 @@ namespace aft
return *state;
}

void nominate_successor() override
{
if (state->leadership_state != ccf::kv::LeadershipState::Leader)
{
RAFT_DEBUG_FMT(
"Not proposing request vote from {} since not leader",
state->node_id);
return;
}

LOG_INFO_FMT("Nominating successor for {}", state->node_id);

#ifdef CCF_RAFT_TRACING
nlohmann::json j = {};
j["function"] = "step_down_and_nominate_successor";
j["state"] = *state;
COMMITTABLE_INDICES(j["state"], state);
j["configurations"] = configurations;
RAFT_TRACE_JSON_OUT(j);
#endif

send_propose_request_vote();
}

private:
void create_and_remove_node_state()
{
Expand Down
4 changes: 4 additions & 0 deletions src/consensus/aft/test/driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ int main(int argc, char** argv)
assert(items.size() == 1);
driver->loop_until_sync(lineno);
break;
case shash("nominate_successor"):
assert(items.size() == 2);
driver->nominate_successor(items[1], lineno);
break;
case shash(""):
// Ignore empty lines
skip_invariants = true;
Expand Down
15 changes: 15 additions & 0 deletions src/consensus/aft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,21 @@ class RaftDriver
start_node_id);
}

void nominate_successor(std::string node_id_s, const size_t lineno)
{
ccf::NodeId node_id(node_id_s);
if (_nodes.find(node_id) == _nodes.end())
{
throw std::runtime_error(fmt::format(
"Attempted to nominate unknown node {} on line {}", node_id, lineno));
}

RAFT_DRIVER_PRINT(
"Note over {}: Node {} nominates a successor", node_id, node_id_s);

_nodes.at(node_id).raft->nominate_successor();
}

void cleanup_nodes(
const std::string& term,
const std::vector<std::string>& node_ids,
Expand Down
2 changes: 2 additions & 0 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ namespace ccf::kv
virtual void set_retired_committed(
ccf::SeqNo /*seqno*/, const std::vector<NodeId>& node_ids)
{}

virtual void nominate_successor() {};
};

struct PendingTxInfo
Expand Down
1 change: 1 addition & 0 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,7 @@ namespace ccf

void stop_notice() override
{
consensus->nominate_successor();
stop_noticed = true;
}

Expand Down
45 changes: 45 additions & 0 deletions tests/e2e_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the Apache 2.0 License.
import tempfile
import os
import signal
import shutil

import infra.logging_app as app
Expand Down Expand Up @@ -2001,6 +2002,49 @@ def test_error_message_on_failure_to_fetch_snapshot(const_args):
), f"Did not find expected log messages: {expected_log_messages}"


def run_propose_request_vote(const_args):
args = copy.deepcopy(const_args)
args.label += "_propose_vote"
args.nodes = infra.e2e_args.nodes(args, 3)
# use a high timeout to hedge against flaky nodes which pause for seconds
# In most cases this should not matter as the propose_request_vote will cause the election quickly
args.election_timeout = 20000
with infra.network.network(
args.nodes,
args.binary_dir,
args.debug_nodes,
pdb=args.pdb,
) as network:
LOG.info("Start a network")
network.start_and_open(args, ignore_first_sigterm=True)
original_primary, original_term = network.find_primary()
backups = [
n
for n in network.get_joined_nodes()
if n.node_id != original_primary.node_id
]

original_primary.remote.remote.proc.send_signal(signal.SIGTERM)
# Find any primary which wasn't the original one
# If propose_request_vote worked, the new primary will be elected immediately
# So if this times out, the propose_request_vote likely failed
new_primary, new_term = network.find_primary(
nodes=backups, timeout=(0.9 * args.election_timeout)
)
assert (
new_primary.node_id != original_primary.node_id
), "A new primary should have been elected"
assert (
new_term > original_term
), "The new primary should be in a higher term than the original primary"

LOG.info(f"New primary is node {new_primary.node_id}")

# send a sigterm to ensure they shutdown correctly
for node in backups:
node.remote.remote.proc.send_signal(signal.SIGTERM)


def run_snp_tests(args):
run_initial_uvm_descriptor_checks(args)
run_initial_tcb_version_checks(args)
Expand All @@ -2027,3 +2071,4 @@ def run(args):
run_late_mounted_ledger_check(args)
run_empty_ledger_dir_check(args)
run_read_ledger_on_testdata(args)
run_propose_request_vote(args)
42 changes: 42 additions & 0 deletions tests/raft_scenarios/nominate_successor
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
start_node,0
emit_signature,2

swap_nodes,2,in,1,2
emit_signature,2

periodic_one,0,10
dispatch_all
periodic_one,0,10
dispatch_all
periodic_one,0,10
dispatch_all

assert_commit_idx,0,4
assert_config,0,3,0,1,2

assert_commit_idx,1,4
assert_config,1,3,0,1,2

assert_commit_idx,2,4
assert_config,2,3,0,1,2

# Node 0 nominates node 1 as a successor arbitrarily
assert_detail,0,leadership_state,Leader
nominate_successor,0
assert_detail,0,leadership_state,Leader
dispatch_single,0,1
assert_detail,1,leadership_state,Candidate

# Node 2 now is updated to have a better matchIdx than node 1
emit_signature,2
periodic_one,0,10
dispatch_single,0,2
dispatch_single,2,0
assert_commit_idx,0,5

# Node 0 nominates node 2 as a successor now
nominate_successor,0
dispatch_single,0,2
assert_detail,2,leadership_state,Candidate

loop_until_sync
3 changes: 1 addition & 2 deletions tests/raft_scenarios_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@ def preprocess_for_trace_validation(log):
last_cmd = ""
for line in log:
entry = json.loads(line)
if "cmd" in entry:
if "cmd" in entry and len(entry["cmd"]) > 0:
last_cmd = entry["cmd"]
continue
node = entry["msg"]["state"]["node_id"]
entry["cmd"] = last_cmd
entry["cmd_prefix"] = entry["cmd"].split(",")[0]
last_cmd = ""
if initial_node is None:
initial_node = node
if entry["msg"]["function"] == "add_configuration":
Expand Down
1 change: 1 addition & 0 deletions tla/consensus/SIMccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ CONSTANTS
Timeout <- SIMTimeout
ChangeConfigurationInt <-SIMChangeConfigurationInt
CheckQuorum <- SIMCheckQuorum
SigTermProposeVote <- SIMSigTermProposeVote
ClientRequest <- SIMClientRequest

Fairness <- SIMFairness
Expand Down
4 changes: 4 additions & 0 deletions tla/consensus/SIMccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ SIMCheckQuorum(i) ==
/\ 1 = RandomElement(Q)
/\ CCF!CheckQuorum(i)

SIMSigTermProposeVote(i) ==
/\ 1 = RandomElement(Q)
/\ CCF!SigTermProposeVote(i)

LOCAL C ==
1..IF "C" \in DOMAIN IOEnv THEN atoi(IOEnv.C) ELSE 10

Expand Down
12 changes: 12 additions & 0 deletions tla/consensus/Traceccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,24 @@ IsCheckQuorum ==
/\ Len(log[logline.msg.state.node_id]) = logline.msg.state.last_idx
/\ (logline.msg.state.pre_vote_enabled => PreVoteEnabled \in preVoteStatus[logline.msg.state.node_id])

IsSigTermProposeVote ==
/\ IsEvent("step_down_and_nominate_successor")
/\ SigTermProposeVote(logline.msg.state.node_id)
/\ leadershipState[logline.msg.state.node_id] = Leader
/\ Range(logline.msg.state.committable_indices) \subseteq CommittableIndices(logline.msg.state.node_id)
/\ commitIndex[logline.msg.state.node_id] = logline.msg.state.commit_idx
/\ leadershipState[logline.msg.state.node_id] = ToLeadershipState[logline.msg.state.leadership_state]
/\ membershipState[logline.msg.state.node_id] \in ToMembershipState[logline.msg.state.membership_state]
/\ Len(log[logline.msg.state.node_id]) = logline.msg.state.last_idx
/\ (logline.msg.state.pre_vote_enabled => PreVoteEnabled \in preVoteStatus[logline.msg.state.node_id])

TraceNext ==
\/ IsTimeout
\/ IsBecomeCandidate
\/ IsBecomeLeader
\/ IsBecomeFollower
\/ IsCheckQuorum
\/ IsSigTermProposeVote

\/ IsClientRequest
\/ IsCleanupNodes
Expand Down
11 changes: 11 additions & 0 deletions tla/consensus/ccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,16 @@ CheckQuorum(i) ==
/\ isNewFollower' = [isNewFollower EXCEPT ![i] = TRUE]
/\ UNCHANGED <<preVoteStatus, reconfigurationVars, messageVars, currentTerm, votedFor, candidateVars, leaderVars, logVars, membershipState>>

SigTermProposeVote(i) ==
/\ leadershipState[i] = Leader
/\ \E j \in PlausibleSucessorNodes(i):
/\ LET msg == [type |-> ProposeVoteRequest,
term |-> currentTerm[i],
source |-> i,
dest |-> j ]
IN Send(msg)
/\ UNCHANGED <<preVoteStatus, reconfigurationVars, serverVars, candidateVars, leaderVars, logVars>>

------------------------------------------------------------------------------
\* Message handlers
\* i = recipient, j = sender, m = message
Expand Down Expand Up @@ -1350,6 +1360,7 @@ NextInt(i) ==
\/ AppendRetiredCommitted(i)
\/ AdvanceCommitIndex(i)
\/ CheckQuorum(i)
\/ SigTermProposeVote(i)
\/ \E j \in Servers : RequestVote(i, j)
\/ \E j \in Servers : AppendEntries(i, j)
\/ \E j \in Servers : Receive(i, j)
Expand Down