diff --git a/crates/sui-core/src/authority_active/execution_driver/tests.rs b/crates/sui-core/src/authority_active/execution_driver/tests.rs index f4249c599f6ac..d89cc05e715eb 100644 --- a/crates/sui-core/src/authority_active/execution_driver/tests.rs +++ b/crates/sui-core/src/authority_active/execution_driver/tests.rs @@ -81,6 +81,9 @@ async fn pending_exec_storage_notify() { .collect(), ) .expect("Storage is ok"); + + tokio::task::yield_now().await; + // Wait for a notification (must arrive) authority_state.database.wait_for_new_pending().await; // get back the certificates diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index 40b895a8ede69..a798595205f97 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -616,8 +616,12 @@ impl CheckpointStore { // We have a proposal so lets try to re-construct the checkpoint. let next_sequence_number = self.next_checkpoint(); + let locals = self.get_locals(); + + // Ok to unwrap because of the check above + let our_proposal = locals.current_proposal.as_ref().unwrap(); - if let Ok(Some(contents)) = self.reconstruct_contents(committee) { + if let Ok(Some(contents)) = self.reconstruct_contents(committee, our_proposal) { // Here we check, and ensure, all transactions are processed before we // move to sign the checkpoint. if !self @@ -650,6 +654,7 @@ impl CheckpointStore { pub fn reconstruct_contents( &mut self, committee: &Committee, + our_proposal: &CheckpointProposal, ) -> Result, FragmentInternalError> { let next_sequence_number = self.next_checkpoint(); let fragments: Vec<_> = self @@ -667,81 +672,79 @@ impl CheckpointStore { .map_err(FragmentInternalError::Error)?; if let Some(reconstructed) = _potential_checkpoint { - if let Some(proposal) = &self.get_locals().current_proposal { - // By definition the proposal and the new checkpoint must be in the - // same sequence number of checkpoint. + // A little argument about how the fragment -> checkpoint process is live + // + // A global checkpoint candidate must contain at least 2f+1 stake. And as + // a result of this f+1 stake will be from honest nodes that by definition + // must have submitted a proposal (because it is included!). + // So f+1 honest authorities will be able to reconstruct and sign the + // checkpoint. And all other authorities by asking all authorities will be + // able to get f+1 signatures and construct a checkpoint certificate. + + // By definition the proposal and the new checkpoint must be in the + // same sequence number of checkpoint. + + // Strategy 1 to reconstruct checkpoint -- we are included in it! + + if reconstructed + .global + .authority_waypoints + .contains_key(&self.name) + { + // We are included in the proposal, so we can go ahead and construct the + // full checkpoint! + let mut contents = our_proposal.transactions.clone(); + contents.transactions.extend( + // Add all items missing to reach then global waypoint + reconstructed.global.authority_waypoints[&self.name] + .items + .clone(), + ); + + return Ok(Some(contents)); + } - // Strategy 1 to reconstruct checkpoint -- we are included in it! + // Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set + + let local_links: HashSet<_> = self.local_fragments.keys().collect(); + let checkpoint_keys: HashSet<_> = reconstructed + .global + .authority_waypoints + .keys() + .cloned() + .collect(); + + if let Some(auth) = local_links.intersection(&checkpoint_keys).next() { + let fragment = self + .local_fragments + .get(auth) + .map_err(|err| FragmentInternalError::Error(err.into()))? + .unwrap(); + + // Extract the diff + let diff = if fragment.proposer.authority() == &self.name { + fragment.diff + } else { + fragment.diff.swap() + }; - if reconstructed + if let Ok(contents) = reconstructed .global - .authority_waypoints - .contains_key(&self.name) + .checkpoint_items(&diff, our_proposal.transactions.transactions.clone()) { - // We are included in the proposal, so we can go ahead and construct the - // full checkpoint! - let mut contents = proposal.transactions.clone(); - contents.transactions.extend( - // Add all items missing to reach then global waypoint - reconstructed.global.authority_waypoints[&self.name] - .items - .clone(), - ); - + let contents = CheckpointContents::new(contents.into_iter()); return Ok(Some(contents)); } - - // Strategy 2 to reconstruct checkpoint -- There is a link between us and the checkpoint set - - let local_links: HashSet<_> = self.local_fragments.keys().collect(); - let checkpoint_keys: HashSet<_> = reconstructed - .global - .authority_waypoints - .keys() - .cloned() - .collect(); - - if let Some(auth) = local_links.intersection(&checkpoint_keys).next() { - let fragment = self - .local_fragments - .get(auth) - .map_err(|err| FragmentInternalError::Error(err.into()))? - .unwrap(); - - // Extract the diff - let diff = if fragment.proposer.authority() == &self.name { - fragment.diff - } else { - fragment.diff.swap() - }; - - if let Ok(contents) = reconstructed - .global - .checkpoint_items(&diff, proposal.transactions.transactions.clone()) - { - let contents = CheckpointContents::new(contents.into_iter()); - return Ok(Some(contents)); - } - } - } else { - // A little argument about how the fragment -> checkpoint process is live - // - // A global checkpoint candidate must contain at least 2f+1 stake. And as - // a result of this f+1 stake will be from honest nodes that by definition - // must have submitted a proposal (because it is included!). - // So f+1 honest authorities will be able to reconstruct and sign the - // checkpoint. And all other authorities by asking all authorities will be - // able to get f+1 signatures and construct a checkpoint certificate. - - // Sets the reconstruction to false, we have all fragments we need, but - // just cannot reconstruct the contents. - let locals = self.get_locals(); - let mut new_locals = locals.as_ref().clone(); - new_locals.no_more_fragments = true; - self.set_locals(locals, new_locals) - .map_err(FragmentInternalError::Error)?; } + // Sets the reconstruction to false, we have all fragments we need, but + // just cannot reconstruct the contents. + let locals = self.get_locals(); + let mut new_locals = locals.as_ref().clone(); + new_locals.no_more_fragments = true; + self.set_locals(locals, new_locals) + .map_err(FragmentInternalError::Error)?; + return Err(FragmentInternalError::Error(SuiError::from( "Missing info to construct known checkpoint.", ))); diff --git a/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs b/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs index 7e69ba9cb89ba..2a352c30b1f09 100644 --- a/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs +++ b/crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs @@ -1727,3 +1727,124 @@ async fn checkpoint_messaging_flow() { assert!(matches!(response.info, AuthorityCheckpointInfo::Success)); } } + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn test_no_more_fragments() { + let mut setup = checkpoint_tests_setup(5, Duration::from_millis(500), true).await; + + // Check that the system is running. + let t = setup.transactions.pop().unwrap(); + let (_cert, effects) = setup + .aggregator + .execute_transaction(&t) + .await + .expect("All ok."); + + // Check whether this is a success? + assert!(matches!( + effects.effects.status, + ExecutionStatus::Success { .. } + )); + + // Wait for a batch to go through + // (We do not really wait, we jump there since real-time is not running). + tokio::time::sleep(Duration::from_secs(5)).await; + + // Happy path checkpoint flow + + // Step 1 -- get a bunch of proposals + let mut proposals = Vec::new(); + // First make sure each authority creates a proposal. + for auth in &setup.authorities { + let proposal = auth + .checkpoint + .lock() + .new_proposal(setup.committee.epoch) + .unwrap(); + proposals.push(proposal); + } + + let p3 = proposals.pop().unwrap(); + let p2 = proposals.pop().unwrap(); + let p1 = proposals.pop().unwrap(); + let p0 = proposals.pop().unwrap(); + + let f01 = p0.fragment_with(&p1); + let f02 = p0.fragment_with(&p2); + let f03 = p0.fragment_with(&p3); + + // put in fragment 0-1 and no checkpoint can be formed + + setup.authorities[0] + .checkpoint + .lock() + .submit_local_fragment_to_consensus(&f01, &setup.committee) + .unwrap(); + + // Give time to the receiving task to process (so that consensus can sequence fragments). + tokio::time::sleep(Duration::from_secs(1)).await; + + // Expecting more fragments + assert!( + !setup.authorities[0] + .checkpoint + .lock() + .get_locals() + .no_more_fragments + ); + + // put in fragment 0-2, now node 0 can form a checkpoint but not node 3 + + setup.authorities[0] + .checkpoint + .lock() + .submit_local_fragment_to_consensus(&f02, &setup.committee) + .unwrap(); + + // Give time to the receiving task to process (so that consensus can sequence fragments). + tokio::time::sleep(Duration::from_secs(1)).await; + + assert!(setup.authorities[0] + .checkpoint + .lock() + .attempt_to_construct_checkpoint(&setup.committee) + .unwrap()); + + // Expecting more fragments + assert!( + !setup.authorities[0] + .checkpoint + .lock() + .get_locals() + .no_more_fragments + ); + + // node 3 cannot make one + assert!(!setup.authorities[3] + .checkpoint + .lock() + .attempt_to_construct_checkpoint(&setup.committee) + .unwrap()); + + // Expecting more fragments + assert!( + setup.authorities[3] + .checkpoint + .lock() + .get_locals() + .no_more_fragments + ); + + // Now fie node 3 a link and it can make the checkpoint + setup.authorities[3] + .checkpoint + .lock() + .submit_local_fragment_to_consensus(&f03, &setup.committee) + .unwrap(); + + assert!(setup.authorities[3] + .checkpoint + .lock() + .attempt_to_construct_checkpoint(&setup.committee) + .unwrap()); +}