From 2df033e8b9aae52f67afb70169b6a9ebfb54c4db Mon Sep 17 00:00:00 2001 From: Andrey Chursin Date: Tue, 15 Nov 2022 11:05:48 -0800 Subject: [PATCH] [checkpoint v2] Use commit round instead for checkpoint boundaries (#6125) Since we now have a commit round we can just use it to select transactions for checkpoint. https://github.com/MystenLabs/sui/issues/5763 --- crates/sui-core/src/authority.rs | 18 ++++++------------ .../sui-core/src/authority/authority_store.rs | 17 +++++++---------- crates/sui-core/src/consensus_handler.rs | 6 +++--- narwhal/executor/src/lib.rs | 6 +++--- narwhal/executor/src/notifier.rs | 4 ++-- narwhal/executor/src/state.rs | 6 +++--- narwhal/types/src/consensus.rs | 4 ++++ 7 files changed, 28 insertions(+), 33 deletions(-) diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index a7064d21210c4..ca3c2be8f91d3 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -44,7 +44,7 @@ use narwhal_config::{ Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache, WorkerId as ConsensusWorkerId, }; -use narwhal_types::ConsensusOutput; +use narwhal_types::CommittedSubDag; use sui_adapter::adapter; use sui_config::genesis::Genesis; use sui_json_rpc_types::{ @@ -2244,24 +2244,18 @@ impl AuthorityState { } } - pub(crate) fn handle_commit_boundary( - &self, - consensus_output: &Arc, - ) -> SuiResult { - debug!("Commit boundary at {}", consensus_output.consensus_index); + pub(crate) fn handle_commit_boundary(&self, committed_dag: &Arc) -> SuiResult { + let round = committed_dag.round(); + debug!("Commit boundary at {}", round); // This exchange is restart safe because of following: // // We try to read last checkpoint content and send it to the checkpoint service // CheckpointService::notify_checkpoint is idempotent in case you send same last checkpoint multiple times // // Only after CheckpointService::notify_checkpoint stores checkpoint in it's store we update checkpoint boundary - if let Some((index, roots)) = self - .database - .last_checkpoint(consensus_output.consensus_index)? - { + if let Some((index, roots)) = self.database.last_checkpoint(round)? { self.checkpoint_service.notify_checkpoint(index, roots)?; } - self.database - .record_checkpoint_boundary(consensus_output.consensus_index) + self.database.record_checkpoint_boundary(round) } } diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index 2bfb6226a8118..27bf04f9ff8ad 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -1329,7 +1329,7 @@ impl Deserialize<'de>> SuiDataStore { } let iter = epoch_tables.consensus_message_order.iter(); - let last_previous = ExecutionIndices::last_for_certificate(from_height_excluded); + let last_previous = ExecutionIndices::end_for_commit(from_height_excluded); let iter = iter.skip_to(&last_previous)?; // skip_to lands to key the last_key or key after it // technically here we need to check if first item in stream has a key equal to last_previous @@ -1343,7 +1343,7 @@ impl Deserialize<'de>> SuiDataStore { Ok(Some((index, roots))) } - pub fn record_checkpoint_boundary(&self, certificate_height: u64) -> SuiResult { + pub fn record_checkpoint_boundary(&self, commit_round: u64) -> SuiResult { if let Some((index, height)) = self .epoch_tables() .checkpoint_boundary @@ -1351,28 +1351,25 @@ impl Deserialize<'de>> SuiDataStore { .skip_to_last() .next() { - if height >= certificate_height { + if height >= commit_round { // Due to crash recovery we might see same boundary twice debug!("Not recording checkpoint boundary - already updated"); } else { let index = index + 1; debug!( "Recording checkpoint boundary {} at {}", - index, certificate_height + index, commit_round ); self.epoch_tables() .checkpoint_boundary - .insert(&index, &certificate_height)?; + .insert(&index, &commit_round)?; } } else { // Table is empty - debug!( - "Recording first checkpoint boundary at {}", - certificate_height - ); + debug!("Recording first checkpoint boundary at {}", commit_round); self.epoch_tables() .checkpoint_boundary - .insert(&0, &certificate_height)?; + .insert(&0, &commit_round)?; } Ok(()) } diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 30772c89947aa..0240bab3edaa2 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -5,7 +5,7 @@ use crate::authority::authority_store_tables::ExecutionIndicesWithHash; use crate::authority::AuthorityState; use async_trait::async_trait; use narwhal_executor::{ExecutionIndices, ExecutionState}; -use narwhal_types::ConsensusOutput; +use narwhal_types::{CommittedSubDag, ConsensusOutput}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::sync::{Arc, Mutex}; @@ -123,9 +123,9 @@ impl ExecutionState for ConsensusHandler { } #[instrument(level = "trace", skip_all)] - async fn notify_commit_boundary(&self, consensus_output: &Arc) { + async fn notify_commit_boundary(&self, committed_dag: &Arc) { self.state - .handle_commit_boundary(consensus_output) + .handle_commit_boundary(committed_dag) .expect("Unrecoverable error in consensus handler when processing commit boundary") } } diff --git a/narwhal/executor/src/lib.rs b/narwhal/executor/src/lib.rs index f3633fb8c8a28..89be02e53f0ed 100644 --- a/narwhal/executor/src/lib.rs +++ b/narwhal/executor/src/lib.rs @@ -57,7 +57,7 @@ pub trait ExecutionState { /// Current implementation sends this notification at the end of narwhal certificate /// /// In the future this will be triggered on the actual commit boundary, once per narwhal commit - async fn notify_commit_boundary(&self, _consensus_output: &Arc) {} + async fn notify_commit_boundary(&self, _committed_dag: &Arc) {} /// Load the last consensus index from storage. async fn load_execution_indices(&self) -> ExecutionIndices; @@ -172,8 +172,8 @@ impl ExecutionState for Arc { .await } - async fn notify_commit_boundary(&self, consensus_output: &Arc) { - self.as_ref().notify_commit_boundary(consensus_output).await + async fn notify_commit_boundary(&self, committed_dag: &Arc) { + self.as_ref().notify_commit_boundary(committed_dag).await } async fn load_execution_indices(&self) -> ExecutionIndices { diff --git a/narwhal/executor/src/notifier.rs b/narwhal/executor/src/notifier.rs index f30ef70367118..23557ed80bd9f 100644 --- a/narwhal/executor/src/notifier.rs +++ b/narwhal/executor/src/notifier.rs @@ -48,10 +48,10 @@ impl Notifier { let mut bytes = 0usize; for (transaction_index, transaction) in batch.transactions.into_iter().enumerate() { let execution_indices = ExecutionIndices { + last_committed_round: index.sub_dag.round(), next_certificate_index: index.next_certificate_index, next_batch_index: index.batch_index + 1, next_transaction_index: transaction_index as u64 + 1, - last_committed_round: index.sub_dag.leader.round(), }; bytes += transaction.len(); @@ -66,7 +66,7 @@ impl Notifier { if index.batch_index + 1 == index.output.certificate.header.payload.len() as u64 && index.sub_dag.is_last(&index.output) { - self.callback.notify_commit_boundary(&index.output).await; + self.callback.notify_commit_boundary(&index.sub_dag).await; } self.metrics diff --git a/narwhal/executor/src/state.rs b/narwhal/executor/src/state.rs index ab7356319f57d..641dc1f20da11 100644 --- a/narwhal/executor/src/state.rs +++ b/narwhal/executor/src/state.rs @@ -18,10 +18,10 @@ pub struct ExecutionIndices { } impl ExecutionIndices { - pub fn last_for_certificate(certificate_index: u64) -> Self { + pub fn end_for_commit(commit_round: u64) -> Self { ExecutionIndices { - last_committed_round: 0, - next_certificate_index: certificate_index, + last_committed_round: commit_round, + next_certificate_index: SequenceNumber::MAX, next_batch_index: SequenceNumber::MAX, next_transaction_index: SequenceNumber::MAX, } diff --git a/narwhal/types/src/consensus.rs b/narwhal/types/src/consensus.rs index 10f5ce2718d87..3b521f3257212 100644 --- a/narwhal/types/src/consensus.rs +++ b/narwhal/types/src/consensus.rs @@ -55,6 +55,10 @@ impl CommittedSubDag { .last() .map_or_else(|| false, |x| x == output) } + + pub fn round(&self) -> Round { + self.leader.round() + } } #[derive(Serialize, Deserialize, Clone, Debug)]