-
Notifications
You must be signed in to change notification settings - Fork 11.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify handle_checkpoint_from_consensus
#12212
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -442,89 +442,75 @@ where | |
|
||
// Handle a checkpoint that we received from consensus | ||
fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) { | ||
let (next_sequence_number, previous_digest) = { | ||
let latest_checkpoint = self | ||
.store | ||
.get_highest_verified_checkpoint() | ||
.expect("store operation should not fail"); | ||
|
||
// If this is an older checkpoint, just ignore it | ||
if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() { | ||
return; | ||
} | ||
|
||
let next_sequence_number = latest_checkpoint.sequence_number().saturating_add(1); | ||
let previous_digest = *latest_checkpoint.digest(); | ||
(next_sequence_number, previous_digest) | ||
}; | ||
// Always check previous_digest matches in case there is a gap between | ||
// state sync and consensus. | ||
let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1) | ||
.expect("store operation should not fail") | ||
.unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1)) | ||
.digest(); | ||
if checkpoint.previous_digest != Some(prev_digest) { | ||
panic!("Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}", checkpoint.sequence_number(), Some(prev_digest), checkpoint.previous_digest); | ||
} | ||
|
||
// If this is exactly the next checkpoint then insert it and then notify our peers | ||
if *checkpoint.sequence_number() == next_sequence_number | ||
&& checkpoint.previous_digest == Some(previous_digest) | ||
{ | ||
let checkpoint = *checkpoint; | ||
|
||
// Check invariant that consensus must only send state-sync fully synced checkpoints | ||
#[cfg(debug_assertions)] | ||
{ | ||
self.store | ||
.get_full_checkpoint_contents(&checkpoint.content_digest) | ||
.expect("store operation should not fail") | ||
.unwrap(); | ||
} | ||
let latest_checkpoint = self | ||
.store | ||
.get_highest_verified_checkpoint() | ||
.expect("store operation should not fail"); | ||
|
||
self.store | ||
.insert_checkpoint(checkpoint.clone()) | ||
.expect("store operation should not fail"); | ||
self.store | ||
.update_highest_synced_checkpoint(&checkpoint) | ||
.expect("store operation should not fail"); | ||
self.metrics | ||
.set_highest_verified_checkpoint(*checkpoint.sequence_number()); | ||
self.metrics | ||
.set_highest_synced_checkpoint(*checkpoint.sequence_number()); | ||
|
||
// We don't care if no one is listening as this is a broadcast channel | ||
let _ = self.checkpoint_event_sender.send(checkpoint.clone()); | ||
|
||
self.spawn_notify_peers_of_checkpoint(checkpoint); | ||
} else { | ||
// Ensure that if consensus sends us a checkpoint that we expect to be the next one, | ||
// that it isn't on a fork | ||
if *checkpoint.sequence_number() == next_sequence_number { | ||
panic!( | ||
"consensus sent a forked checkpoint with different previous_digest, seq_num: {}, expected_previous_digest: {:?}, actual_previous_digest: {:?}", | ||
next_sequence_number, Some(previous_digest), checkpoint.previous_digest | ||
); | ||
} | ||
// If this is an older checkpoint, just ignore it | ||
if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() { | ||
return; | ||
} | ||
|
||
let checkpoint = *checkpoint; | ||
let next_sequence_number = latest_checkpoint.sequence_number().saturating_add(1); | ||
if *checkpoint.sequence_number() > next_sequence_number { | ||
debug!( | ||
"consensus sent too new of a checkpoint, expecting: {}, got: {}", | ||
next_sequence_number, | ||
checkpoint.sequence_number() | ||
); | ||
} | ||
Comment on lines
+467
to
+473
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your PR description and the comment below mentions that this shouldn't happen right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, being a bit nimble here because there could be bugs somewhere that we overlook |
||
|
||
// See if the missing checkpoints are already in our store and quickly update our | ||
// watermarks | ||
let mut checkpoints_from_storage = | ||
(next_sequence_number..=*checkpoint.sequence_number()).map(|n| { | ||
self.store | ||
// Because checkpoint from consensus sends in order, when we have checkpoint n, | ||
// we must have all of the checkpoints before n from either state sync or consensus. | ||
#[cfg(debug_assertions)] | ||
{ | ||
let _ = (next_sequence_number..=*checkpoint.sequence_number()) | ||
.map(|n| { | ||
let checkpoint = self | ||
.store | ||
.get_checkpoint_by_sequence_number(n) | ||
.expect("store operation should not fail") | ||
}); | ||
while let Some(Some(checkpoint)) = checkpoints_from_storage.next() { | ||
self.store | ||
.update_highest_synced_checkpoint(&checkpoint) | ||
.expect("store operation should not fail"); | ||
self.metrics | ||
.set_highest_verified_checkpoint(*checkpoint.sequence_number()); | ||
self.metrics | ||
.set_highest_synced_checkpoint(*checkpoint.sequence_number()); | ||
|
||
// We don't care if no one is listening as this is a broadcast channel | ||
let _ = self.checkpoint_event_sender.send(checkpoint.clone()); | ||
} | ||
.unwrap_or_else(|| panic!("store should contain checkpoint {n}")); | ||
self.store | ||
.get_full_checkpoint_contents(&checkpoint.content_digest) | ||
.expect("store operation should not fail") | ||
.unwrap_or_else(|| { | ||
panic!( | ||
"store should contain checkpoint contents for {:?}", | ||
checkpoint.content_digest | ||
) | ||
}); | ||
}) | ||
.collect::<Vec<_>>(); | ||
} | ||
|
||
self.metrics | ||
.set_highest_verified_checkpoint(*checkpoint.sequence_number()); | ||
self.metrics | ||
.set_highest_synced_checkpoint(*checkpoint.sequence_number()); | ||
self.store | ||
.update_highest_verified_checkpoint(&checkpoint) | ||
.expect("store operation should not fail"); | ||
self.store | ||
.update_highest_synced_checkpoint(&checkpoint) | ||
.expect("store operation should not fail"); | ||
|
||
// We don't care if no one is listening as this is a broadcast channel | ||
let _ = self.checkpoint_event_sender.send(checkpoint.clone()); | ||
|
||
self.spawn_notify_peers_of_checkpoint(checkpoint); | ||
} | ||
|
||
fn handle_peer_event( | ||
|
@@ -950,7 +936,7 @@ where | |
// Insert the newly verified checkpoint into our store, which will bump our highest | ||
// verified checkpoint watermark as well. | ||
store | ||
.insert_checkpoint(checkpoint.clone()) | ||
.insert_checkpoint(&checkpoint) | ||
.expect("store operation should not fail"); | ||
metrics.set_highest_verified_checkpoint(*checkpoint.sequence_number()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this should be fine even with the genesis checkpoint since consensus wont send us the genesis checkpoint right? If not i'm sure we could tweak this to account for checkpoint 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I believe so, consensus won't send state sync genesis checkpoint because it's already certified