Skip to content

Commit

Permalink
[Consensus 2.0] Recover from amnesia - part 2 (#18190)
Browse files Browse the repository at this point in the history
## Description 

The second part of the recovering from amnesia feature. This PR is
implementing the logic to fetch the last own block from the peer network
in the synchronizer. It is also adding several tests to confirm the
behaviour correctness.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored Jul 22, 2024
1 parent 6201942 commit cb04174
Show file tree
Hide file tree
Showing 4 changed files with 462 additions and 18 deletions.
168 changes: 167 additions & 1 deletion consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ where
);

// First shutdown components calling into Core.
self.synchronizer.stop().await;
self.synchronizer.stop().await.ok();
self.commit_syncer.stop().await;
self.leader_timeout_handle.stop().await;
// Shutdown Core to stop block productions and broadcast.
Expand Down Expand Up @@ -350,6 +350,7 @@ where
mod tests {
#![allow(non_snake_case)]

use std::sync::Mutex;
use std::{collections::BTreeSet, sync::Arc, time::Duration};

use consensus_config::{local_committee_and_keys, Parameters};
Expand Down Expand Up @@ -495,6 +496,170 @@ mod tests {
}
}

#[rstest]
#[tokio::test(flavor = "current_thread")]
async fn test_amnesia_success(
#[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
) {
telemetry_subscribers::init_for_testing();
let db_registry = Registry::new();
DBMetrics::init(&db_registry);

let (committee, keypairs) = local_committee_and_keys(0, vec![1, 1, 1, 1]);
let mut output_receivers = vec![];
let mut authorities = vec![];

for (index, _authority_info) in committee.authorities() {
let (authority, receiver) = make_authority(
index,
&TempDir::new().unwrap(),
committee.clone(),
keypairs.clone(),
network_type,
)
.await;
output_receivers.push(receiver);
authorities.push(authority);
}

const NUM_TRANSACTIONS: u8 = 15;
let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
for i in 0..NUM_TRANSACTIONS {
let txn = vec![i; 16];
submitted_transactions.insert(txn.clone());
authorities[i as usize % authorities.len()]
.transaction_client()
.submit(vec![txn])
.await
.unwrap();
}

for receiver in &mut output_receivers {
let mut expected_transactions = submitted_transactions.clone();
loop {
let committed_subdag =
tokio::time::timeout(Duration::from_secs(1), receiver.recv())
.await
.unwrap()
.unwrap();
for b in committed_subdag.blocks {
for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
assert!(
expected_transactions.remove(&txn),
"Transaction not submitted or already seen: {:?}",
txn
);
}
}
assert_eq!(committed_subdag.reputation_scores_desc, vec![]);
if expected_transactions.is_empty() {
break;
}
}
}

// Stop authority 1.
let index = committee.to_authority_index(1).unwrap();
authorities.remove(index.value()).stop().await;
sleep(Duration::from_secs(5)).await;

// now create a new directory to simulate amnesia. The node will start having participated previously
// to consensus but now will attempt to synchronize the last own block and recover from there.
let (authority, mut receiver) = make_authority(
index,
&TempDir::new().unwrap(),
committee.clone(),
keypairs,
network_type,
)
.await;
authorities.insert(index.value(), authority);
sleep(Duration::from_secs(5)).await;

// We wait until we see at least one committed block authored from this authority
'outer: while let Some(result) = receiver.recv().await {
for block in result.blocks {
if block.author() == index {
break 'outer;
}
}
}

// Stop all authorities and exit.
for authority in authorities {
authority.stop().await;
}
}

#[rstest]
#[tokio::test]
async fn test_amnesia_failure(
#[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork,
) {
telemetry_subscribers::init_for_testing();

let occurred_panic = Arc::new(Mutex::new(None));
let occurred_panic_cloned = occurred_panic.clone();

let default_panic_handler = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic| {
let mut l = occurred_panic_cloned.lock().unwrap();
*l = Some(panic.to_string());
default_panic_handler(panic);
}));

let db_registry = Registry::new();
DBMetrics::init(&db_registry);

let (committee, keypairs) = local_committee_and_keys(0, vec![1, 1, 1, 1]);
let mut output_receivers = vec![];
let mut authorities = vec![];

for (index, _authority_info) in committee.authorities() {
let (authority, receiver) = make_authority(
index,
&TempDir::new().unwrap(),
committee.clone(),
keypairs.clone(),
network_type,
)
.await;
output_receivers.push(receiver);
authorities.push(authority);
}

// Let the network run for a few seconds
sleep(Duration::from_secs(5)).await;

// Stop all authorities
while let Some(authority) = authorities.pop() {
authority.stop().await;
}

sleep(Duration::from_secs(2)).await;

let index = AuthorityIndex::new_for_test(0);
let (_authority, _receiver) = make_authority(
index,
&TempDir::new().unwrap(),
committee,
keypairs,
network_type,
)
.await;
sleep(Duration::from_secs(5)).await;

// Now reset the panic hook
let _default_panic_handler = std::panic::take_hook();

// We expect this test to panic as all the other peers are down and the node that tries to
// recover its last produced block fails.
let panic_info = occurred_panic.lock().unwrap().take().unwrap();
assert!(panic_info.contains(
"No peer has returned any acceptable result, can not safely update min round"
));
}

// TODO: create a fixture
async fn make_authority(
index: AuthorityIndex,
Expand All @@ -511,6 +676,7 @@ mod tests {
dag_state_cached_rounds: 5,
commit_sync_parallel_fetches: 3,
commit_sync_batch_size: 3,
sync_last_proposed_block_timeout: Duration::from_millis(2_000),
..Default::default()
};
let txn_verifier = NoopTransactionVerifier {};
Expand Down
8 changes: 8 additions & 0 deletions consensus/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ pub(crate) enum ConsensusError {
block_ref: BlockRef,
},

#[error(
"Unexpected block {block_ref} returned while fetching last own block from peer {index}"
)]
UnexpectedLastOwnBlock {
index: AuthorityIndex,
block_ref: BlockRef,
},

#[error("Too many blocks have been returned from authority {0} when requesting to fetch missing blocks")]
TooManyFetchedBlocksReturned(AuthorityIndex),

Expand Down
6 changes: 6 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub(crate) struct NodeMetrics {
pub(crate) last_committed_authority_round: IntGaugeVec,
pub(crate) last_committed_leader_round: IntGauge,
pub(crate) last_commit_index: IntGauge,
pub(crate) last_known_own_block_round: IntGauge,
pub(crate) commit_round_advancement_interval: Histogram,
pub(crate) last_decided_leader_round: IntGauge,
pub(crate) leader_timeout_total: IntCounterVec,
Expand Down Expand Up @@ -321,6 +322,11 @@ impl NodeMetrics {
&["authority", "type"],
registry,
).unwrap(),
last_known_own_block_round: register_int_gauge_with_registry!(
"last_known_own_block_round",
"The highest round of our own block as this has been synced from peers during an amnesia recovery",
registry,
).unwrap(),
// TODO: add a short status label.
invalid_blocks: register_int_counter_vec_with_registry!(
"invalid_blocks",
Expand Down
Loading

0 comments on commit cb04174

Please sign in to comment.