Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delay notify_randomness_in_checkpoint until checkpoint is committed #21031

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 42 additions & 20 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type CheckpointExecutionBuffer = FuturesOrdered<
Option<Accumulator>,
Option<CheckpointData>,
Vec<TransactionDigest>,
Vec<RandomnessRound>,
)>,
>;

Expand Down Expand Up @@ -309,10 +310,10 @@ impl CheckpointExecutor {
// watermark accordingly. Note that given that checkpoints are guaranteed to
// be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered
// guarantees that we will also ratchet the watermarks in order.
Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests))) = pending.next() => {
Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests, randomness_rounds))) = pending.next() => {
let _process_scope = mysten_metrics::monitored_scope("ProcessExecutedCheckpoint");

self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests).await;
self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests, randomness_rounds).await;
self.backpressure_manager.update_highest_executed_checkpoint(*checkpoint.sequence_number());
highest_executed = Some(checkpoint.clone());

Expand Down Expand Up @@ -444,6 +445,7 @@ impl CheckpointExecutor {
checkpoint_acc: Option<Accumulator>,
checkpoint_data: Option<CheckpointData>,
all_tx_digests: &[TransactionDigest],
randomness_rounds: Vec<RandomnessRound>,
) {
// Commit all transaction effects to disk
let cache_commit = self.state.get_cache_commit();
Expand All @@ -463,6 +465,21 @@ impl CheckpointExecutor {
.handle_committed_transactions(all_tx_digests)
.expect("cannot fail");

// Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
);
randomness_reporter
.notify_randomness_in_checkpoint(round)
.expect("epoch cannot have ended");
}
}

if let Some(checkpoint_data) = checkpoint_data {
self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
.await;
Expand Down Expand Up @@ -579,7 +596,7 @@ impl CheckpointExecutor {

pending.push_back(spawn_monitored_task!(async move {
let epoch_store = epoch_store.clone();
let (tx_digests, checkpoint_acc, checkpoint_data) = loop {
let (tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds) = loop {
match execute_checkpoint(
checkpoint.clone(),
&state,
Expand All @@ -603,12 +620,23 @@ impl CheckpointExecutor {
tokio::time::sleep(Duration::from_secs(1)).await;
metrics.checkpoint_exec_errors.inc();
}
Ok((tx_digests, checkpoint_acc, checkpoint_data)) => {
break (tx_digests, checkpoint_acc, checkpoint_data)
Ok((tx_digests, checkpoint_acc, checkpoint_data, randomness_rounds)) => {
break (
tx_digests,
checkpoint_acc,
checkpoint_data,
randomness_rounds,
)
}
}
};
(checkpoint, checkpoint_acc, checkpoint_data, tx_digests)
(
checkpoint,
checkpoint_acc,
checkpoint_data,
tx_digests,
randomness_rounds,
)
}));
}

Expand Down Expand Up @@ -769,6 +797,7 @@ impl CheckpointExecutor {

// Logs within the function are annotated with the checkpoint sequence number and epoch,
// from schedule_checkpoint().
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
async fn execute_checkpoint(
checkpoint: VerifiedCheckpoint,
Expand All @@ -786,6 +815,7 @@ async fn execute_checkpoint(
Vec<TransactionDigest>,
Option<Accumulator>,
Option<CheckpointData>,
Vec<RandomnessRound>,
)> {
debug!("Preparing checkpoint for execution",);
let prepare_start = Instant::now();
Expand Down Expand Up @@ -829,20 +859,12 @@ async fn execute_checkpoint(
)
.await?;

// Once execution is complete, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
);
randomness_reporter.notify_randomness_in_checkpoint(round)?;
}
}

Ok((all_tx_digests, checkpoint_acc, checkpoint_data))
Ok((
all_tx_digests,
checkpoint_acc,
checkpoint_data,
randomness_rounds,
))
}

#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use arc_swap::ArcSwap;
use consensus_config::Committee as ConsensusCommittee;
use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock};
use lru::LruCache;
use mysten_common::debug_fatal;
use mysten_metrics::{
monitored_future,
monitored_mpsc::{self, UnboundedReceiver},
Expand Down Expand Up @@ -353,7 +354,9 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
&parsed.transaction.kind
{
// These are deprecated and we should never see them. Log an error and eat the tx if one appears.
error!("BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}")
debug_fatal!(
"BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}"
);
} else {
let transaction =
SequencedConsensusTransactionKind::External(parsed.transaction);
Expand Down
Loading