Skip to content

Commit

Permalink
[core] Modify checkpointing logic to ensure all transactions in a sig…
Browse files Browse the repository at this point in the history
…ned / accepted checkpoint are executed (MystenLabs#2626)

* Remove un-executed transactions
* Connect execution engine to checkpointing tests
* Fixed shared object checkpoint test
* Increase test timeout (MystenLabs#2658)
* Propagate assumption that all tx are executed to simplify logic

Co-authored-by: George Danezis <george@danez.is>
Co-authored-by: Alberto Sonnino <alberto@sonnino.com>
  • Loading branch information
3 people authored Jun 24, 2022
1 parent ec299ae commit 8a3d078
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 352 deletions.
6 changes: 5 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,11 @@ impl AuthorityState {
// Update the checkpointing mechanism
checkpoint
.lock()
.handle_internal_batch(batch.batch.next_sequence_number, &transactions)
.handle_internal_batch(
batch.batch.next_sequence_number,
&transactions,
&state.committee.load(),
)
.expect("Should see no errors updating the checkpointing mechanism.");
}
}
Expand Down
91 changes: 3 additions & 88 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::{
};
use sui_types::committee::StakeUnit;
use tracing::{debug, info, warn};
use typed_store::Map;

#[cfg(test)]
pub(crate) mod tests;
Expand Down Expand Up @@ -177,22 +176,7 @@ pub async fn checkpoint_process<A>(
}
}

// (3) Process any unprocessed transactions. We do this before trying to move to the
// next proposal.
if let Err(err) = process_unprocessed_digests(
active_authority,
state_checkpoints.clone(),
timing.per_other_authority_delay,
)
.await
{
warn!("Error processing unprocessed: {:?}", err);
// Nothing happens until we catch up with the unprocessed transactions of the
// previous checkpoint.
continue;
}

// (4) Check if we need to advance to the next checkpoint, in case >2/3
// (3) Check if we need to advance to the next checkpoint, in case >2/3
// have a proposal out. If so we start creating and injecting fragments
// into the consensus protocol to make the new checkpoint.
let weight: StakeUnit = proposals
Expand Down Expand Up @@ -417,6 +401,8 @@ where
{
// Get out last checkpoint
let latest_checkpoint = checkpoint_db.lock().latest_stored_checkpoint()?;
// We use the latest available authorities not the authorities that signed the checkpoint
// since these might be gone after the epoch they were active.
let available_authorities: BTreeSet<_> = latest_known_checkpoint
.signatory_authorities()
.cloned()
Expand Down Expand Up @@ -731,77 +717,6 @@ where
Ok(fragment)
}

/// Looks into the unprocessed_digests and tries to process them all to allow
/// for the creation of the next proposal. Also uses the unprocessed_content
/// to look for transactions before going to fetch them from the network.
pub async fn process_unprocessed_digests<A>(
active_authority: &ActiveAuthority<A>,
checkpoint_db: Arc<Mutex<CheckpointStore>>,
per_other_authority_delay: Duration,
) -> Result<(), SuiError>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
let unprocessed_digests: Vec<_> = checkpoint_db
.lock()
.unprocessed_transactions
.iter()
.map(|(digest, _)| digest)
.collect();

let existing_certificates = checkpoint_db
.lock()
.unprocessed_contents
.multi_get(&unprocessed_digests)?;

// First process all certs that we have stored in the unprocessed_contents
let mut processed = BTreeSet::new();
for (digest, cert) in unprocessed_digests
.iter()
.zip(existing_certificates.iter())
.filter_map(|(digest, cert_opt)| cert_opt.as_ref().map(|c| (digest, c)))
{
active_authority
.net
.load()
.sync_certificate_to_authority_with_timeout(
ConfirmationTransaction::new(cert.clone()),
active_authority.state.name,
per_other_authority_delay,
3,
)
.await?;
processed.insert(digest);
}

for digest in &unprocessed_digests {
// If we have processed this continue with the next cert, nothing to do
if active_authority
.state
.database
.effects_exists(&digest.transaction)?
{
continue;
}

// Download the certificate
debug!("Try sync for digest: {digest:?}");
if let Err(err) = sync_digest(
active_authority.state.name,
active_authority.net.load().clone(),
digest.transaction,
per_other_authority_delay,
)
.await
{
warn!("Error doing sync from digest {digest:?}: {err}");
return Err(err);
}
}

Ok(())
}

/// Sync to a transaction certificate
pub async fn sync_digest<A>(
name: AuthorityName,
Expand Down
17 changes: 13 additions & 4 deletions crates/sui-core/src/authority_active/checkpoint_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn checkpoint_active_flow_happy_path() {
use telemetry_subscribers::init_for_testing;
init_for_testing();

let setup = checkpoint_tests_setup(20, Duration::from_millis(200)).await;
let setup = checkpoint_tests_setup(20, Duration::from_millis(200), true).await;

let TestSetup {
committee: _committee,
Expand Down Expand Up @@ -58,6 +58,7 @@ async fn checkpoint_active_flow_happy_path() {
ExecutionStatus::Success { .. }
));
println!("Execute at {:?}", tokio::time::Instant::now());
println!("Effects: {:?}", effects.effects.digest());

// Add some delay between transactions
tokio::time::sleep(Duration::from_secs(27)).await;
Expand Down Expand Up @@ -97,7 +98,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
use telemetry_subscribers::init_for_testing;
init_for_testing();

let setup = checkpoint_tests_setup(20, Duration::from_millis(200)).await;
let setup = checkpoint_tests_setup(20, Duration::from_millis(500), false).await;

let TestSetup {
committee: _committee,
Expand All @@ -118,6 +119,10 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
)
.unwrap(),
);

println!("Start active execution process.");
active_state.clone().spawn_execute_process().await;

// Spin the gossip service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
Expand Down Expand Up @@ -160,7 +165,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {

// Wait for a batch to go through
// (We do not really wait, we jump there since real-time is not running).
tokio::time::sleep(Duration::from_secs(10 * 60)).await;
tokio::time::sleep(Duration::from_secs(180 * 60)).await;

let mut value_set = BTreeSet::new();
for a in authorities {
Expand Down Expand Up @@ -188,7 +193,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
use telemetry_subscribers::init_for_testing;
init_for_testing();

let setup = checkpoint_tests_setup(20, Duration::from_millis(200)).await;
let setup = checkpoint_tests_setup(20, Duration::from_millis(200), false).await;

let TestSetup {
committee: _committee,
Expand All @@ -209,6 +214,10 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
)
.unwrap(),
);

println!("Start active execution process.");
active_state.clone().spawn_execute_process().await;

// Spin the gossip service.
active_state
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn pending_exec_storage_notify() {
use telemetry_subscribers::init_for_testing;
init_for_testing();

let setup = checkpoint_tests_setup(20, Duration::from_millis(200)).await;
let setup = checkpoint_tests_setup(20, Duration::from_millis(200), true).await;

let TestSetup {
committee: _committee,
Expand Down Expand Up @@ -93,10 +93,10 @@ async fn pending_exec_storage_notify() {

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn pending_exec_full() {
use telemetry_subscribers::init_for_testing;
init_for_testing();
// use telemetry_subscribers::init_for_testing;
// init_for_testing();

let setup = checkpoint_tests_setup(20, Duration::from_millis(200)).await;
let setup = checkpoint_tests_setup(20, Duration::from_millis(200), true).await;

let TestSetup {
committee: _committee,
Expand Down
9 changes: 5 additions & 4 deletions crates/sui-core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ impl crate::authority::AuthorityState {
// If a checkpointing service is present, register the batch with it
// to insert the transactions into future checkpoint candidates
if let Some(checkpoint) = &self.checkpoints {
if let Err(err) = checkpoint
.lock()
.handle_internal_batch(new_batch.batch.next_sequence_number, &current_batch)
{
if let Err(err) = checkpoint.lock().handle_internal_batch(
new_batch.batch.next_sequence_number,
&current_batch,
&self.committee.load(),
) {
error!("Checkpointing service error: {}", err);
}
}
Expand Down
Loading

0 comments on commit 8a3d078

Please sign in to comment.