Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify handle_checkpoint_from_consensus #12212

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-archival/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn write_new_checkpoints_to_store(
);
}
for checkpoint in ordered_checkpoints.iter() {
store.inner_mut().insert_checkpoint(checkpoint.clone());
store.inner_mut().insert_checkpoint(checkpoint);
}
Ok(ordered_checkpoints.last().cloned())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ fn sync_checkpoint(
sender: &Sender<VerifiedCheckpoint>,
) {
checkpoint_store
.insert_verified_checkpoint(checkpoint.clone())
.insert_verified_checkpoint(checkpoint)
.unwrap();
checkpoint_store
.insert_checkpoint_contents(empty_contents().into_inner().into_checkpoint_contents())
Expand Down
30 changes: 16 additions & 14 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl CheckpointStore {
);
}
self.insert_checkpoint_contents(contents).unwrap();
self.insert_verified_checkpoint(checkpoint.clone()).unwrap();
self.insert_verified_checkpoint(&checkpoint).unwrap();
self.update_highest_synced_checkpoint(&checkpoint).unwrap();
}
}
Expand Down Expand Up @@ -239,16 +239,6 @@ impl CheckpointStore {
self.get_checkpoint_by_digest(&highest_verified.1)
}

pub fn get_highest_synced_checkpoint_seq_number(
&self,
) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
if let Some(highest_synced) = self.watermarks.get(&CheckpointWatermark::HighestSynced)? {
Ok(Some(highest_synced.0))
} else {
Ok(None)
}
}

pub fn get_highest_synced_checkpoint(
&self,
) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
Expand Down Expand Up @@ -301,6 +291,11 @@ impl CheckpointStore {
self.full_checkpoint_content.get(&seq)
}

// Called by consensus (ConsensusAggregator).
// Different from `insert_verified_checkpoint`, it does not touch
// the highest_verified_checkpoint watermark such that state sync
// will have a chance to process this checkpoint and perfom some
// state-sync only things.
pub fn insert_certified_checkpoint(
&self,
checkpoint: &VerifiedCheckpoint,
Expand All @@ -324,13 +319,20 @@ impl CheckpointStore {
batch.write()
}

// Called by state sync, apart from inserting the checkpoint and updating
// related tables, it also bumps the highest_verified_checkpoint watermark.
pub fn insert_verified_checkpoint(
&self,
checkpoint: VerifiedCheckpoint,
checkpoint: &VerifiedCheckpoint,
) -> Result<(), TypedStoreError> {
self.insert_certified_checkpoint(&checkpoint)?;
self.insert_certified_checkpoint(checkpoint)?;
self.update_highest_verified_checkpoint(checkpoint)
}

// Update latest
pub fn update_highest_verified_checkpoint(
&self,
checkpoint: &VerifiedCheckpoint,
) -> Result<(), TypedStoreError> {
if Some(*checkpoint.sequence_number())
> self
.get_highest_verified_checkpoint()?
Expand Down
10 changes: 9 additions & 1 deletion crates/sui-core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl ReadStore for RocksDbStore {
}

impl WriteStore for RocksDbStore {
fn insert_checkpoint(&self, checkpoint: VerifiedCheckpoint) -> Result<(), Self::Error> {
fn insert_checkpoint(&self, checkpoint: &VerifiedCheckpoint) -> Result<(), Self::Error> {
if let Some(EndOfEpochData {
next_epoch_committee,
..
Expand All @@ -167,6 +167,14 @@ impl WriteStore for RocksDbStore {
.update_highest_synced_checkpoint(checkpoint)
}

fn update_highest_verified_checkpoint(
&self,
checkpoint: &VerifiedCheckpoint,
) -> Result<(), Self::Error> {
self.checkpoint_store
.update_highest_verified_checkpoint(checkpoint)
}

fn insert_checkpoint_contents(
&self,
checkpoint: &VerifiedCheckpoint,
Expand Down
130 changes: 58 additions & 72 deletions crates/sui-network/src/state_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,89 +442,75 @@ where

// Handle a checkpoint that we received from consensus
fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
let (next_sequence_number, previous_digest) = {
let latest_checkpoint = self
.store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");

// If this is an older checkpoint, just ignore it
if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
return;
}

let next_sequence_number = latest_checkpoint.sequence_number().saturating_add(1);
let previous_digest = *latest_checkpoint.digest();
(next_sequence_number, previous_digest)
};
// Always check previous_digest matches in case there is a gap between
// state sync and consensus.
let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
.expect("store operation should not fail")
.unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
Comment on lines +447 to +449
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this should be fine even with the genesis checkpoint since consensus wont send us the genesis checkpoint right? If not i'm sure we could tweak this to account for checkpoint 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I believe so, consensus won't send state sync genesis checkpoint because it's already certified

.digest();
if checkpoint.previous_digest != Some(prev_digest) {
panic!("Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}", checkpoint.sequence_number(), Some(prev_digest), checkpoint.previous_digest);
}

// If this is exactly the next checkpoint then insert it and then notify our peers
if *checkpoint.sequence_number() == next_sequence_number
&& checkpoint.previous_digest == Some(previous_digest)
{
let checkpoint = *checkpoint;

// Check invariant that consensus must only send state-sync fully synced checkpoints
#[cfg(debug_assertions)]
{
self.store
.get_full_checkpoint_contents(&checkpoint.content_digest)
.expect("store operation should not fail")
.unwrap();
}
let latest_checkpoint = self
.store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");

self.store
.insert_checkpoint(checkpoint.clone())
.expect("store operation should not fail");
self.store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");
self.metrics
.set_highest_verified_checkpoint(*checkpoint.sequence_number());
self.metrics
.set_highest_synced_checkpoint(*checkpoint.sequence_number());

// We don't care if no one is listening as this is a broadcast channel
let _ = self.checkpoint_event_sender.send(checkpoint.clone());

self.spawn_notify_peers_of_checkpoint(checkpoint);
} else {
// Ensure that if consensus sends us a checkpoint that we expect to be the next one,
// that it isn't on a fork
if *checkpoint.sequence_number() == next_sequence_number {
panic!(
"consensus sent a forked checkpoint with different previous_digest, seq_num: {}, expected_previous_digest: {:?}, actual_previous_digest: {:?}",
next_sequence_number, Some(previous_digest), checkpoint.previous_digest
);
}
// If this is an older checkpoint, just ignore it
if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
return;
}

let checkpoint = *checkpoint;
let next_sequence_number = latest_checkpoint.sequence_number().saturating_add(1);
if *checkpoint.sequence_number() > next_sequence_number {
debug!(
"consensus sent too new of a checkpoint, expecting: {}, got: {}",
next_sequence_number,
checkpoint.sequence_number()
);
}
Comment on lines +467 to +473
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your PR description and the comment below mentions that this shouldn't happen right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, being a bit nimble here because there could be bugs somewhere that we overlook


// See if the missing checkpoints are already in our store and quickly update our
// watermarks
let mut checkpoints_from_storage =
(next_sequence_number..=*checkpoint.sequence_number()).map(|n| {
self.store
// Because checkpoint from consensus sends in order, when we have checkpoint n,
// we must have all of the checkpoints before n from either state sync or consensus.
#[cfg(debug_assertions)]
{
let _ = (next_sequence_number..=*checkpoint.sequence_number())
.map(|n| {
let checkpoint = self
.store
.get_checkpoint_by_sequence_number(n)
.expect("store operation should not fail")
});
while let Some(Some(checkpoint)) = checkpoints_from_storage.next() {
self.store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");
self.metrics
.set_highest_verified_checkpoint(*checkpoint.sequence_number());
self.metrics
.set_highest_synced_checkpoint(*checkpoint.sequence_number());

// We don't care if no one is listening as this is a broadcast channel
let _ = self.checkpoint_event_sender.send(checkpoint.clone());
}
.unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
self.store
.get_full_checkpoint_contents(&checkpoint.content_digest)
.expect("store operation should not fail")
.unwrap_or_else(|| {
panic!(
"store should contain checkpoint contents for {:?}",
checkpoint.content_digest
)
});
})
.collect::<Vec<_>>();
}

self.metrics
.set_highest_verified_checkpoint(*checkpoint.sequence_number());
self.metrics
.set_highest_synced_checkpoint(*checkpoint.sequence_number());
self.store
.update_highest_verified_checkpoint(&checkpoint)
.expect("store operation should not fail");
self.store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");

// We don't care if no one is listening as this is a broadcast channel
let _ = self.checkpoint_event_sender.send(checkpoint.clone());

self.spawn_notify_peers_of_checkpoint(checkpoint);
}

fn handle_peer_event(
Expand Down Expand Up @@ -950,7 +936,7 @@ where
// Insert the newly verified checkpoint into our store, which will bump our highest
// verified checkpoint watermark as well.
store
.insert_checkpoint(checkpoint.clone())
.insert_checkpoint(&checkpoint)
.expect("store operation should not fail");
metrics.set_highest_verified_checkpoint(*checkpoint.sequence_number());
}
Expand Down
6 changes: 4 additions & 2 deletions crates/sui-network/src/state_sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn server_get_checkpoint() {

// Populate the node's store with some checkpoints
for checkpoint in ordered_checkpoints.clone() {
builder.store.inner_mut().insert_checkpoint(checkpoint)
builder.store.inner_mut().insert_checkpoint(&checkpoint)
}
let latest = ordered_checkpoints.last().unwrap().clone();
builder
Expand Down Expand Up @@ -201,7 +201,7 @@ async fn isolated_sync_job() {
{
let mut store = event_loop_2.store.inner_mut();
for checkpoint in ordered_checkpoints.clone() {
store.insert_checkpoint(checkpoint);
store.insert_checkpoint(&checkpoint);
}
}

Expand Down Expand Up @@ -307,6 +307,7 @@ async fn sync_with_checkpoints_being_inserted() {
store_1
.insert_checkpoint_contents(&checkpoint, empty_contents())
.unwrap();
store_1.insert_certified_checkpoint(&checkpoint);
handle_1.send_checkpoint(checkpoint).await;

timeout(Duration::from_secs(1), async {
Expand All @@ -324,6 +325,7 @@ async fn sync_with_checkpoints_being_inserted() {

// Inject all the checkpoints
for checkpoint in checkpoint_iter {
store_1.insert_certified_checkpoint(&checkpoint);
handle_1.send_checkpoint(checkpoint).await;
}

Expand Down
Loading