Skip to content

Commit

Permalink
[Consensus 2.0] Recover from amnesia - part 1 (MystenLabs#18009)
Browse files Browse the repository at this point in the history
## Description 

This PR is introducing a best practice to recover a node's consensus
from amnesia (db wipe out). Currently if a node that has already
participated in consensus (proposed blocks etc) crashes and recovers
from amnesia it will eventually equivocate during recovery and make the
node crash (as per our DagState rules).

With this PR:
* introduces a mechanism to ask the network peers about the node's
highest proposed block in order to use this round as the min proposed
one and start proposing after that round to avoid equivocations
* guard the fetch of the last own block behind a property that
enables/disables that and also defines a timeout to manage and gather
peer responses. This has been explicitly chosen in order to (1) avoid
unnecessary recovery of last block during normal node start (2) maximise
safety.
* node will crash if the recovery feature is enabled and it doesn't
manage to gather any response. Here we can also decide if we want to
introduce some min stake threshold as well.

Next steps:
* expose the property that enables the last own block recovery to the
node's properties/config. Validator operators can optionally enable this
when they attempt to recovery from amnesia.

## Test plan 

CI/private-testnet

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK: 
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
- [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ] - [ ]
  • Loading branch information
akichidis authored Jun 26, 2024
1 parent 32d0433 commit bed2833
Show file tree
Hide file tree
Showing 16 changed files with 795 additions and 199 deletions.
16 changes: 16 additions & 0 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub struct Parameters {
/// Tonic network settings.
#[serde(default = "TonicParameters::default")]
pub tonic: TonicParameters,

/// Time to wait during node start up until the node has synced the last proposed block via the
/// network peers. When set to `0` the sync mechanism is disabled. This property is meant to be
/// used for amnesia recovery.
#[serde(default = "Parameters::default_sync_last_proposed_block_timeout")]
pub sync_last_proposed_block_timeout: Duration,
}

impl Parameters {
Expand Down Expand Up @@ -124,6 +130,14 @@ impl Parameters {
pub(crate) fn default_commit_sync_batches_ahead() -> usize {
200
}

pub(crate) fn default_sync_last_proposed_block_timeout() -> Duration {
Duration::ZERO
}

pub fn is_sync_last_proposed_block_enabled(&self) -> bool {
!self.sync_last_proposed_block_timeout.is_zero()
}
}

impl Default for Parameters {
Expand All @@ -135,6 +149,8 @@ impl Default for Parameters {
max_forward_time_drift: Parameters::default_max_forward_time_drift(),
dag_state_cached_rounds: Parameters::default_dag_state_cached_rounds(),
max_blocks_per_fetch: Parameters::default_max_blocks_per_fetch(),
sync_last_proposed_block_timeout: Parameters::default_sync_last_proposed_block_timeout(
),
commit_sync_parallel_fetches: Parameters::default_commit_sync_parallel_fetches(),
commit_sync_batch_size: Parameters::default_commit_sync_batch_size(),
commit_sync_batches_ahead: Parameters::default_commit_sync_batches_ahead(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ tonic:
connection_buffer_size: 33554432
excessive_message_size: 16777216
message_size_limit: 67108864
sync_last_proposed_block_timeout:
secs: 0
nanos: 0

19 changes: 19 additions & 0 deletions consensus/core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ fn build_tonic_services(out_dir: &Path) {
.codec_path(codec_path)
.build(),
)
.method(
tonic_build::manual::Method::builder()
.name("fetch_latest_blocks")
.route_name("FetchLatestBlocks")
.input_type("crate::network::tonic_network::FetchLatestBlocksRequest")
.output_type("crate::network::tonic_network::FetchLatestBlocksResponse")
.codec_path(codec_path)
.server_streaming()
.build(),
)
.build();

tonic_build::manual::Builder::new()
Expand Down Expand Up @@ -109,6 +119,15 @@ fn build_anemo_services(out_dir: &Path) {
.codec_path(codec_path)
.build(),
)
.method(
anemo_build::manual::Method::builder()
.name("fetch_latest_blocks")
.route_name("FetchLatestBlocks")
.request_type("crate::network::anemo_network::FetchLatestBlocksRequest")
.response_type("crate::network::anemo_network::FetchLatestBlocksResponse")
.codec_path(codec_path)
.build(),
)
.build();

anemo_build::manual::Builder::new()
Expand Down
258 changes: 61 additions & 197 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,116 +346,17 @@ mod tests {

use std::{collections::BTreeSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::{local_committee_and_keys, Parameters};
use mysten_metrics::monitored_mpsc::unbounded_channel;
use parking_lot::Mutex;
use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver};
use prometheus::Registry;
use rstest::rstest;
use sui_protocol_config::ProtocolConfig;
use tempfile::TempDir;
use tokio::{sync::broadcast, time::sleep};
use tokio::time::sleep;
use typed_store::DBMetrics;

use super::*;
use crate::{
authority_node::AuthorityService,
block::{BlockAPI as _, BlockRef, Round, TestBlock, VerifiedBlock},
block_verifier::NoopBlockVerifier,
commit::CommitRange,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
error::ConsensusResult,
network::{BlockStream, NetworkClient, NetworkService as _},
storage::mem_store::MemStore,
transaction::NoopTransactionVerifier,
};

struct FakeCoreThreadDispatcher {
blocks: Mutex<Vec<VerifiedBlock>>,
}

impl FakeCoreThreadDispatcher {
fn new() -> Self {
Self {
blocks: Mutex::new(vec![]),
}
}

fn get_blocks(&self) -> Vec<VerifiedBlock> {
self.blocks.lock().clone()
}
}

#[async_trait]
impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
async fn add_blocks(
&self,
blocks: Vec<VerifiedBlock>,
) -> Result<BTreeSet<BlockRef>, CoreError> {
let block_refs = blocks.iter().map(|b| b.reference()).collect();
self.blocks.lock().extend(blocks);
Ok(block_refs)
}

async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
Ok(())
}

async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
Ok(Default::default())
}

fn set_consumer_availability(&self, _available: bool) -> Result<(), CoreError> {
Ok(())
}
}

#[derive(Default)]
struct FakeNetworkClient {}

#[async_trait]
impl NetworkClient for FakeNetworkClient {
const SUPPORT_STREAMING: bool = false;

async fn send_block(
&self,
_peer: AuthorityIndex,
_block: &VerifiedBlock,
_timeout: Duration,
) -> ConsensusResult<()> {
unimplemented!("Unimplemented")
}

async fn subscribe_blocks(
&self,
_peer: AuthorityIndex,
_last_received: Round,
_timeout: Duration,
) -> ConsensusResult<BlockStream> {
unimplemented!("Unimplemented")
}

async fn fetch_blocks(
&self,
_peer: AuthorityIndex,
_block_refs: Vec<BlockRef>,
_highest_accepted_rounds: Vec<Round>,
_timeout: Duration,
) -> ConsensusResult<Vec<Bytes>> {
unimplemented!("Unimplemented")
}

async fn fetch_commits(
&self,
_peer: AuthorityIndex,
_commit_range: CommitRange,
_timeout: Duration,
) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
unimplemented!("Unimplemented")
}
}
use crate::{block::BlockAPI as _, transaction::NoopTransactionVerifier, CommittedSubDag};

#[rstest]
#[tokio::test]
Expand Down Expand Up @@ -500,61 +401,6 @@ mod tests {
authority.stop().await;
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_authority_service() {
let (context, _keys) = Context::new_for_test(4);
let context = Arc::new(context);
let block_verifier = Arc::new(NoopBlockVerifier {});
let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
let network_client = Arc::new(FakeNetworkClient::default());
let store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
let synchronizer = Synchronizer::start(
network_client,
context.clone(),
core_dispatcher.clone(),
block_verifier.clone(),
dag_state.clone(),
);
let authority_service = Arc::new(AuthorityService::new(
context.clone(),
block_verifier,
Arc::new(CommitVoteMonitor::new(context.clone())),
synchronizer,
core_dispatcher.clone(),
rx_block_broadcast,
dag_state,
store,
));

// Test delaying blocks with time drift.
let now = context.clock.timestamp_utc_ms();
let max_drift = context.parameters.max_forward_time_drift;
let input_block = VerifiedBlock::new_for_test(
TestBlock::new(9, 0)
.set_timestamp_ms(now + max_drift.as_millis() as u64)
.build(),
);

let service = authority_service.clone();
let serialized = input_block.serialized().clone();
tokio::spawn(async move {
service
.handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
.await
.unwrap();
});

sleep(max_drift / 2).await;
assert!(core_dispatcher.get_blocks().is_empty());

sleep(max_drift).await;
let blocks = core_dispatcher.get_blocks();
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0], input_block);
}

// TODO: build AuthorityFixture.
#[rstest]
#[tokio::test(flavor = "current_thread")]
Expand All @@ -567,49 +413,18 @@ mod tests {
let (committee, keypairs) = local_committee_and_keys(0, vec![1, 1, 1, 1]);
let temp_dirs = (0..4).map(|_| TempDir::new().unwrap()).collect::<Vec<_>>();

let make_authority = |index: AuthorityIndex| {
let committee = committee.clone();
let registry = Registry::new();

// Cache less blocks to exercise commit sync.
let parameters = Parameters {
db_path: Some(temp_dirs[index.value()].path().to_path_buf()),
dag_state_cached_rounds: 5,
commit_sync_parallel_fetches: 3,
commit_sync_batch_size: 3,
..Default::default()
};
let txn_verifier = NoopTransactionVerifier {};

let protocol_keypair = keypairs[index].1.clone();
let network_keypair = keypairs[index].0.clone();

let (sender, receiver) = unbounded_channel("consensus_output");
let commit_consumer = CommitConsumer::new(sender, 0, 0);

async move {
let authority = ConsensusAuthority::start(
network_type,
index,
committee,
parameters,
ProtocolConfig::get_for_max_version_UNSAFE(),
protocol_keypair,
network_keypair,
Arc::new(txn_verifier),
commit_consumer,
registry,
)
.await;
(authority, receiver)
}
};

let mut output_receivers = Vec::with_capacity(committee.size());
let mut authorities = Vec::with_capacity(committee.size());

for (index, _authority_info) in committee.authorities() {
let (authority, receiver) = make_authority(index).await;
let (authority, receiver) = make_authority(
index,
&temp_dirs[index.value()],
committee.clone(),
keypairs.clone(),
network_type,
)
.await;
output_receivers.push(receiver);
authorities.push(authority);
}
Expand Down Expand Up @@ -656,7 +471,14 @@ mod tests {
sleep(Duration::from_secs(15)).await;

// Restart authority 1 and let it run.
let (authority, receiver) = make_authority(index).await;
let (authority, receiver) = make_authority(
index,
&temp_dirs[index.value()],
committee.clone(),
keypairs.clone(),
network_type,
)
.await;
output_receivers[index] = receiver;
authorities.insert(index.value(), authority);
sleep(Duration::from_secs(15)).await;
Expand All @@ -666,4 +488,46 @@ mod tests {
authority.stop().await;
}
}

// TODO: create a fixture
async fn make_authority(
index: AuthorityIndex,
db_dir: &TempDir,
committee: Committee,
keypairs: Vec<(NetworkKeyPair, ProtocolKeyPair)>,
network_type: ConsensusNetwork,
) -> (ConsensusAuthority, UnboundedReceiver<CommittedSubDag>) {
let registry = Registry::new();

// Cache less blocks to exercise commit sync.
let parameters = Parameters {
db_path: Some(db_dir.path().to_path_buf()),
dag_state_cached_rounds: 5,
commit_sync_parallel_fetches: 3,
commit_sync_batch_size: 3,
..Default::default()
};
let txn_verifier = NoopTransactionVerifier {};

let protocol_keypair = keypairs[index].1.clone();
let network_keypair = keypairs[index].0.clone();

let (sender, receiver) = unbounded_channel("consensus_output");
let commit_consumer = CommitConsumer::new(sender, 0, 0);

let authority = ConsensusAuthority::start(
network_type,
index,
committee,
parameters,
ProtocolConfig::get_for_max_version_UNSAFE(),
protocol_keypair,
network_keypair,
Arc::new(txn_verifier),
commit_consumer,
registry,
)
.await;
(authority, receiver)
}
}
Loading

0 comments on commit bed2833

Please sign in to comment.