Skip to content

Commit

Permalink
[checkpoint v2] Use commit round instead for checkpoint boundaries (M…
Browse files Browse the repository at this point in the history
…ystenLabs#6125)

Since we now have a commit round we can just use it to select transactions for checkpoint.

MystenLabs#5763
  • Loading branch information
andll authored Nov 15, 2022
1 parent 0a6da9a commit 2df033e
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 33 deletions.
18 changes: 6 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
WorkerId as ConsensusWorkerId,
};
use narwhal_types::ConsensusOutput;
use narwhal_types::CommittedSubDag;
use sui_adapter::adapter;
use sui_config::genesis::Genesis;
use sui_json_rpc_types::{
Expand Down Expand Up @@ -2244,24 +2244,18 @@ impl AuthorityState {
}
}

pub(crate) fn handle_commit_boundary(
&self,
consensus_output: &Arc<ConsensusOutput>,
) -> SuiResult {
debug!("Commit boundary at {}", consensus_output.consensus_index);
pub(crate) fn handle_commit_boundary(&self, committed_dag: &Arc<CommittedSubDag>) -> SuiResult {
let round = committed_dag.round();
debug!("Commit boundary at {}", round);
// This exchange is restart safe because of following:
//
// We try to read last checkpoint content and send it to the checkpoint service
// CheckpointService::notify_checkpoint is idempotent in case you send same last checkpoint multiple times
//
// Only after CheckpointService::notify_checkpoint stores checkpoint in it's store we update checkpoint boundary
if let Some((index, roots)) = self
.database
.last_checkpoint(consensus_output.consensus_index)?
{
if let Some((index, roots)) = self.database.last_checkpoint(round)? {
self.checkpoint_service.notify_checkpoint(index, roots)?;
}
self.database
.record_checkpoint_boundary(consensus_output.consensus_index)
self.database.record_checkpoint_boundary(round)
}
}
17 changes: 7 additions & 10 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
}

let iter = epoch_tables.consensus_message_order.iter();
let last_previous = ExecutionIndices::last_for_certificate(from_height_excluded);
let last_previous = ExecutionIndices::end_for_commit(from_height_excluded);
let iter = iter.skip_to(&last_previous)?;
// skip_to lands to key the last_key or key after it
// technically here we need to check if first item in stream has a key equal to last_previous
Expand All @@ -1343,36 +1343,33 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
Ok(Some((index, roots)))
}

pub fn record_checkpoint_boundary(&self, certificate_height: u64) -> SuiResult {
pub fn record_checkpoint_boundary(&self, commit_round: u64) -> SuiResult {
if let Some((index, height)) = self
.epoch_tables()
.checkpoint_boundary
.iter()
.skip_to_last()
.next()
{
if height >= certificate_height {
if height >= commit_round {
// Due to crash recovery we might see same boundary twice
debug!("Not recording checkpoint boundary - already updated");
} else {
let index = index + 1;
debug!(
"Recording checkpoint boundary {} at {}",
index, certificate_height
index, commit_round
);
self.epoch_tables()
.checkpoint_boundary
.insert(&index, &certificate_height)?;
.insert(&index, &commit_round)?;
}
} else {
// Table is empty
debug!(
"Recording first checkpoint boundary at {}",
certificate_height
);
debug!("Recording first checkpoint boundary at {}", commit_round);
self.epoch_tables()
.checkpoint_boundary
.insert(&0, &certificate_height)?;
.insert(&0, &commit_round)?;
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::authority::authority_store_tables::ExecutionIndicesWithHash;
use crate::authority::AuthorityState;
use async_trait::async_trait;
use narwhal_executor::{ExecutionIndices, ExecutionState};
use narwhal_types::ConsensusOutput;
use narwhal_types::{CommittedSubDag, ConsensusOutput};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -123,9 +123,9 @@ impl ExecutionState for ConsensusHandler {
}

#[instrument(level = "trace", skip_all)]
async fn notify_commit_boundary(&self, consensus_output: &Arc<ConsensusOutput>) {
async fn notify_commit_boundary(&self, committed_dag: &Arc<CommittedSubDag>) {
self.state
.handle_commit_boundary(consensus_output)
.handle_commit_boundary(committed_dag)
.expect("Unrecoverable error in consensus handler when processing commit boundary")
}
}
Expand Down
6 changes: 3 additions & 3 deletions narwhal/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub trait ExecutionState {
/// Current implementation sends this notification at the end of narwhal certificate
///
/// In the future this will be triggered on the actual commit boundary, once per narwhal commit
async fn notify_commit_boundary(&self, _consensus_output: &Arc<ConsensusOutput>) {}
async fn notify_commit_boundary(&self, _committed_dag: &Arc<CommittedSubDag>) {}

/// Load the last consensus index from storage.
async fn load_execution_indices(&self) -> ExecutionIndices;
Expand Down Expand Up @@ -172,8 +172,8 @@ impl<T: ExecutionState + 'static + Send + Sync> ExecutionState for Arc<T> {
.await
}

async fn notify_commit_boundary(&self, consensus_output: &Arc<ConsensusOutput>) {
self.as_ref().notify_commit_boundary(consensus_output).await
async fn notify_commit_boundary(&self, committed_dag: &Arc<CommittedSubDag>) {
self.as_ref().notify_commit_boundary(committed_dag).await
}

async fn load_execution_indices(&self) -> ExecutionIndices {
Expand Down
4 changes: 2 additions & 2 deletions narwhal/executor/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ impl<State: ExecutionState + Send + Sync + 'static> Notifier<State> {
let mut bytes = 0usize;
for (transaction_index, transaction) in batch.transactions.into_iter().enumerate() {
let execution_indices = ExecutionIndices {
last_committed_round: index.sub_dag.round(),
next_certificate_index: index.next_certificate_index,
next_batch_index: index.batch_index + 1,
next_transaction_index: transaction_index as u64 + 1,
last_committed_round: index.sub_dag.leader.round(),
};
bytes += transaction.len();

Expand All @@ -66,7 +66,7 @@ impl<State: ExecutionState + Send + Sync + 'static> Notifier<State> {
if index.batch_index + 1 == index.output.certificate.header.payload.len() as u64
&& index.sub_dag.is_last(&index.output)
{
self.callback.notify_commit_boundary(&index.output).await;
self.callback.notify_commit_boundary(&index.sub_dag).await;
}

self.metrics
Expand Down
6 changes: 3 additions & 3 deletions narwhal/executor/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub struct ExecutionIndices {
}

impl ExecutionIndices {
pub fn last_for_certificate(certificate_index: u64) -> Self {
pub fn end_for_commit(commit_round: u64) -> Self {
ExecutionIndices {
last_committed_round: 0,
next_certificate_index: certificate_index,
last_committed_round: commit_round,
next_certificate_index: SequenceNumber::MAX,
next_batch_index: SequenceNumber::MAX,
next_transaction_index: SequenceNumber::MAX,
}
Expand Down
4 changes: 4 additions & 0 deletions narwhal/types/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl CommittedSubDag {
.last()
.map_or_else(|| false, |x| x == output)
}

pub fn round(&self) -> Round {
self.leader.round()
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down

0 comments on commit 2df033e

Please sign in to comment.