From 2de66e19dedcf3edc394b74638bdc3dcbe42d51b Mon Sep 17 00:00:00 2001 From: Mark Logan <103447440+mystenmark@users.noreply.github.com> Date: Fri, 31 Jan 2025 11:41:45 -0800 Subject: [PATCH] delay notify_randomness_in_checkpoint until checkpoint is committed (#21031) This ensures that we do not stop random partial sig transmission until the randomness update is durably committed to disk --- .../checkpoints/checkpoint_executor/mod.rs | 62 +++++++++++++------ crates/sui-core/src/consensus_handler.rs | 5 +- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index c6204b6299003..6a8344b97e6ff 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -77,6 +77,7 @@ type CheckpointExecutionBuffer = FuturesOrdered< Option, Option, Vec, + Vec, )>, >; @@ -309,10 +310,10 @@ impl CheckpointExecutor { // watermark accordingly. Note that given that checkpoints are guaranteed to // be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered // guarantees that we will also ratchet the watermarks in order. - Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests))) = pending.next() => { + Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests, randomness_rounds))) = pending.next() => { let _process_scope = mysten_metrics::monitored_scope("ProcessExecutedCheckpoint"); - self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests).await; + self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests, randomness_rounds).await; self.backpressure_manager.update_highest_executed_checkpoint(*checkpoint.sequence_number()); highest_executed = Some(checkpoint.clone()); @@ -444,6 +445,7 @@ impl CheckpointExecutor { checkpoint_acc: Option, checkpoint_data: Option, all_tx_digests: &[TransactionDigest], + randomness_rounds: Vec, ) { // Commit all transaction effects to disk let cache_commit = self.state.get_cache_commit(); @@ -463,6 +465,21 @@ impl CheckpointExecutor { .handle_committed_transactions(all_tx_digests) .expect("cannot fail"); + // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has + // been successfully included in a checkpoint certified by quorum of validators. + // (RandomnessManager/RandomnessReporter is only present on validators.) + if let Some(randomness_reporter) = epoch_store.randomness_reporter() { + for round in randomness_rounds { + debug!( + ?round, + "notifying RandomnessReporter that randomness update was executed in checkpoint" + ); + randomness_reporter + .notify_randomness_in_checkpoint(round) + .expect("epoch cannot have ended"); + } + } + if let Some(checkpoint_data) = checkpoint_data { self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data) .await; @@ -579,7 +596,7 @@ impl CheckpointExecutor { pending.push_back(spawn_monitored_task!(async move { let epoch_store = epoch_store.clone(); - let (tx_digests, checkpoint_acc, checkpoint_data) = loop { + let (tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds) = loop { match execute_checkpoint( checkpoint.clone(), &state, @@ -603,12 +620,23 @@ impl CheckpointExecutor { tokio::time::sleep(Duration::from_secs(1)).await; metrics.checkpoint_exec_errors.inc(); } - Ok((tx_digests, checkpoint_acc, checkpoint_data)) => { - break (tx_digests, checkpoint_acc, checkpoint_data) + Ok((tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds)) => { + break ( + tx_digests, + checkpoint_acc, + checkpoint_data, + randomness_rounds, + ) } } }; - (checkpoint, checkpoint_acc, checkpoint_data, tx_digests) + ( + checkpoint, + checkpoint_acc, + checkpoint_data, + tx_digests, + randomness_rounds, + ) })); } @@ -769,6 +797,7 @@ impl CheckpointExecutor { // Logs within the function are annotated with the checkpoint sequence number and epoch, // from schedule_checkpoint(). +#[allow(clippy::type_complexity)] #[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))] async fn execute_checkpoint( checkpoint: VerifiedCheckpoint, @@ -786,6 +815,7 @@ async fn execute_checkpoint( Vec, Option, Option, + Vec, )> { debug!("Preparing checkpoint for execution",); let prepare_start = Instant::now(); @@ -829,20 +859,12 @@ async fn execute_checkpoint( ) .await?; - // Once execution is complete, we know that any randomness contained in this checkpoint has - // been successfully included in a checkpoint certified by quorum of validators. - // (RandomnessManager/RandomnessReporter is only present on validators.) - if let Some(randomness_reporter) = epoch_store.randomness_reporter() { - for round in randomness_rounds { - debug!( - ?round, - "notifying RandomnessReporter that randomness update was executed in checkpoint" - ); - randomness_reporter.notify_randomness_in_checkpoint(round)?; - } - } - - Ok((all_tx_digests, checkpoint_acc, checkpoint_data)) + Ok(( + all_tx_digests, + checkpoint_acc, + checkpoint_data, + randomness_rounds, + )) } #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))] diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index ae3c1f5ecf2b6..57aff2756708f 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -12,6 +12,7 @@ use arc_swap::ArcSwap; use consensus_config::Committee as ConsensusCommittee; use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock}; use lru::LruCache; +use mysten_common::debug_fatal; use mysten_metrics::{ monitored_future, monitored_mpsc::{self, UnboundedReceiver}, @@ -353,7 +354,9 @@ impl ConsensusHandler { &parsed.transaction.kind { // These are deprecated and we should never see them. Log an error and eat the tx if one appears. - error!("BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}") + debug_fatal!( + "BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}" + ); } else { let transaction = SequencedConsensusTransactionKind::External(parsed.transaction);