Skip to content

Commit

Permalink
[narwhal] use the LeaderSwapTable in the leader schedule (MystenLabs#…
Browse files Browse the repository at this point in the history
…12515)

## Description 

Follow up of MystenLabs#12465

This PR is:
* Introducing a new `protocol_config` feature flag
`narwhal_new_leader_election_schedule` , so we keep all the new changes
disabled until we decide to enable on a new protocol version
* Introducing the `LeaderSchedule` which from now on will be responsible
for crafting the leader schedule and used to perform the leader
election.
* Is updating the leader schedule with the new scores every `K` commit
rounds
* Is injecting the `LeaderSchedule` to the primary node so it can be
used from the `proposer` module as well since the schedule now can be
updated

Next steps:

- [x] calculate the swap table on every K committed subdags & wire into
the leader election algorithm
- [x] ensure whole feature is gated behind a protocol config feature
flag - and probably a config switch as we might need to have it disabled
for longer than a release cycle.
- [ ] restore correct swap table after crash/recovery
- [ ] modify the commit path so we repeat the leader election when we
commit recursively
- [ ] modify the proposer to support the new leader election
capabilities
- [ ] add testing

## Test Plan 

Added unit tests

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Jul 6, 2023
1 parent d97f6fa commit a749fef
Show file tree
Hide file tree
Showing 22 changed files with 409 additions and 198 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@
"disallow_change_struct_type_params_on_upgrade": false,
"loaded_child_objects_fixed": true,
"missing_type_is_compatibility_error": true,
"narwhal_new_leader_election_schedule": false,
"narwhal_versioned_metadata": false,
"no_extraneous_module_bytes": false,
"package_digest_hash_module": false,
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-protocol-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ struct FeatureFlags {
// If true minimum txn charge is a multiplier of the gas price
#[serde(skip_serializing_if = "is_false")]
txn_base_cost_as_multiplier: bool,

// If true, then the new algorithm for the leader election schedule will be used
#[serde(skip_serializing_if = "is_false")]
narwhal_new_leader_election_schedule: bool,
}

fn is_false(b: &bool) -> bool {
Expand Down Expand Up @@ -815,6 +819,10 @@ impl ProtocolConfig {
pub fn txn_base_cost_as_multiplier(&self) -> bool {
self.feature_flags.txn_base_cost_as_multiplier
}

pub fn narwhal_new_leader_election_schedule(&self) -> bool {
self.feature_flags.narwhal_new_leader_election_schedule
}
}

#[cfg(not(msim))]
Expand Down
1 change: 1 addition & 0 deletions narwhal/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rand = { workspace = true, optional = true }
thiserror.workspace = true
tokio = { workspace = true, features = ["sync"] }
tracing.workspace = true
parking_lot = "0.12.1"

config = { path = "../config", package = "narwhal-config" }
fastcrypto.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use criterion::{
};
use fastcrypto::hash::Hash;
use narwhal_consensus as consensus;
use narwhal_consensus::consensus::{LeaderSchedule, LeaderSwapTable};
use pprof::criterion::{Output, PProfProfiler};
use prometheus::Registry;
use std::{collections::BTreeSet, sync::Arc};
Expand Down Expand Up @@ -62,6 +63,7 @@ pub fn process_certificates(c: &mut Criterion) {
last_successful_leader_election_timestamp: Instant::now(),
max_inserted_certificate_round: 0,
num_sub_dags_per_schedule: 100,
leader_schedule: LeaderSchedule::new(committee.clone(), LeaderSwapTable::default()),
};
consensus_group.bench_with_input(
BenchmarkId::new("batched", certificates.len()),
Expand Down
187 changes: 126 additions & 61 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::consensus::{LeaderSchedule, LeaderSwapTable};
use crate::metrics::ConsensusMetrics;
use crate::{
consensus::{ConsensusState, Dag},
utils, ConsensusError, Outcome,
};
use config::{Authority, Committee, Stake};
use config::{Committee, Stake};
use fastcrypto::hash::Hash;
use std::sync::Arc;
use storage::ConsensusStore;
Expand Down Expand Up @@ -39,6 +40,8 @@ pub struct Bullshark {
/// The number of committed subdags that will trigger the schedule change and reputation
/// score reset.
pub num_sub_dags_per_schedule: u64,
/// The leader election schedule to be used when need to find a round's leader
pub leader_schedule: LeaderSchedule,
}

impl Bullshark {
Expand All @@ -49,6 +52,7 @@ impl Bullshark {
protocol_config: ProtocolConfig,
metrics: Arc<ConsensusMetrics>,
num_sub_dags_per_schedule: u64,
leader_schedule: LeaderSchedule,
) -> Self {
Self {
committee,
Expand All @@ -58,53 +62,7 @@ impl Bullshark {
max_inserted_certificate_round: 0,
metrics,
num_sub_dags_per_schedule,
}
}

// Returns the the authority which is the leader for the provided `round`.
// Pay attention that this method will return always the first authority as the leader
// when used under a test environment.
pub fn leader_authority(committee: &Committee, round: Round) -> Authority {
assert_eq!(
round % 2,
0,
"We should never attempt to do a leader election for odd rounds"
);

cfg_if::cfg_if! {
if #[cfg(test)] {
// We apply round robin in leader election. Since we expect round to be an even number,
// 2, 4, 6, 8... it can't work well for leader election as we'll omit leaders. Thus
// we can always divide by 2 to get a monotonically incremented sequence,
// 2/2 = 1, 4/2 = 2, 6/2 = 3, 8/2 = 4 etc, and then do minus 1 so we can always
// start with base zero 0.
let next_leader = (round/2 - 1) as usize % committee.size();
let authorities = committee.authorities().collect::<Vec<_>>();

(*authorities.get(next_leader).unwrap()).clone()
} else {
// Elect the leader in a stake-weighted choice seeded by the round
committee.leader(round)
}
}
}

/// Returns the certificate originated by the leader of the specified round (if any). The Authority
/// leader of the round is always returned and that's irrespective of whether the certificate exists
/// as that's deterministically determined.
fn leader<'a>(
committee: &Committee,
round: Round,
dag: &'a Dag,
) -> (Authority, Option<&'a Certificate>) {
// Note: this function is often called with even rounds only. While we do not aim at random selection
// yet (see issue #10), repeated calls to this function should still pick from the whole roster of leaders.
let leader = Self::leader_authority(committee, round);

// Return its certificate and the certificate's digest.
match dag.get(&round).and_then(|x| x.get(&leader.id())) {
None => (leader, None),
Some((_, certificate)) => (leader, Some(certificate)),
leader_schedule,
}
}

Expand Down Expand Up @@ -197,7 +155,10 @@ impl Bullshark {
return Ok((Outcome::LeaderBelowCommitRound, Vec::new()));
}

let leader = match Self::leader(&self.committee, leader_round, &state.dag) {
let leader = match self
.leader_schedule
.leader_certificate(leader_round, &state.dag)
{
(_leader_authority, Some(certificate)) => certificate,
(_leader_authority, None) => {
// leader has not been found - we don't have any certificate
Expand Down Expand Up @@ -228,16 +189,7 @@ impl Bullshark {
let mut committed_sub_dags = Vec::new();
let mut total_committed_certificates = 0;

for leader in utils::order_leaders(
&self.committee,
leader,
state,
Self::leader,
self.metrics.clone(),
)
.iter()
.rev()
{
for leader in self.order_leaders(leader, state).iter().rev() {
let sub_dag_index = state.next_sub_dag_index();
let _span = error_span!("bullshark_process_sub_dag", sub_dag_index);

Expand Down Expand Up @@ -268,7 +220,7 @@ impl Bullshark {
sequence,
leader.clone(),
sub_dag_index,
reputation_score,
reputation_score.clone(),
state.last_committed_sub_dag.as_ref(),
);

Expand All @@ -280,6 +232,13 @@ impl Bullshark {
state.last_committed_sub_dag = Some(sub_dag.clone());

committed_sub_dags.push(sub_dag);

// If the leader schedule has been updated, then we'll need to recalculate any upcoming
// leaders for the rest of the recursive commits.
if self.update_leader_schedule(leader.round(), &reputation_score) {
// TODO: a new schedule has been produced, which means that we need to recalculate leaders
// to ensure that the new ones are considered and update the commit path.
}
}

// record the last time we got a successful leader election
Expand Down Expand Up @@ -316,13 +275,119 @@ impl Bullshark {
Ok((Outcome::Commit, committed_sub_dags))
}

/// Order the past leaders that we didn't already commit.
pub fn order_leaders(&self, leader: &Certificate, state: &ConsensusState) -> Vec<Certificate> {
let mut to_commit = vec![leader.clone()];
let mut leader = leader;
assert_eq!(leader.round() % 2, 0);
for r in (state.last_round.committed_round + 2..=leader.round() - 2)
.rev()
.step_by(2)
{
// Get the certificate proposed by the previous leader.
let (prev_leader, authority) =
match self.leader_schedule.leader_certificate(r, &state.dag) {
(authority, Some(x)) => (x, authority),
(authority, None) => {
self.metrics
.leader_election
.with_label_values(&["not_found", authority.hostname()])
.inc();

continue;
}
};

// Check whether there is a path between the last two leaders.
if self.linked(leader, prev_leader, &state.dag) {
to_commit.push(prev_leader.clone());
leader = prev_leader;
} else {
self.metrics
.leader_election
.with_label_values(&["no_path", authority.hostname()])
.inc();
}
}

// Now just report all the found leaders
let committee = self.committee.clone();
let metrics = self.metrics.clone();

to_commit.iter().for_each(|certificate| {
let authority = committee.authority(&certificate.origin()).unwrap();

metrics
.leader_election
.with_label_values(&["committed", authority.hostname()])
.inc();
});

to_commit
}

/// Checks if there is a path between two leaders.
fn linked(&self, leader: &Certificate, prev_leader: &Certificate, dag: &Dag) -> bool {
let mut parents = vec![leader];
for r in (prev_leader.round()..leader.round()).rev() {
parents = dag
.get(&r)
.expect("We should have the whole history by now")
.values()
.filter(|(digest, _)| {
parents
.iter()
.any(|x| x.header().parents().contains(digest))
})
.map(|(_, certificate)| certificate)
.collect();
}
parents.contains(&prev_leader)
}

// When the provided `reputation_scores` are "final" for the current schedule window, then we
// create the new leader swap table and update the leader schedule to use it. Otherwise we do
// nothing. If the schedule has been updated then true is returned.
fn update_leader_schedule(
&mut self,
leader_round: Round,
reputation_scores: &ReputationScores,
) -> bool {
// Do not perform any update if the feature is disabled
if self.protocol_config.narwhal_new_leader_election_schedule()
&& reputation_scores.final_of_schedule
{
// create the new swap table and update the scheduler
self.leader_schedule
.update_leader_swap_table(LeaderSwapTable::new(
&self.committee,
leader_round,
reputation_scores,
));

return true;
}
false
}

fn report_leader_on_time_metrics(&mut self, certificate_round: Round, state: &ConsensusState) {
if certificate_round > self.max_inserted_certificate_round
&& certificate_round % 2 == 0
&& certificate_round > 2
{
let previous_leader_round = certificate_round - 2;
let authority = Self::leader_authority(&self.committee, previous_leader_round);

// This metric reports the leader election success for the last leader election round.
// Our goal is to identify the rate of missed/failed leader elections which are a source
// of tx latency. The metric's authority label can not be considered fully accurate when
// we do change schedule as we'll try to calculate the previous leader round by using the
// updated scores and consequently the new swap table. If the leader for that position has
// changed, then a different hostname will be erroneously reported. For now not a huge
// issue as it will be affect either:
// * only the round where we switch schedules
// * on long periods of asynchrony where we end up changing schedules late
// and we don't really expect it to happen frequently.
let authority = self.leader_schedule.leader(previous_leader_round);

if state.last_round.committed_round < previous_leader_round {
self.metrics
Expand Down
Loading

0 comments on commit a749fef

Please sign in to comment.