Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 29 additions & 39 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2805,32 +2805,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !payload_verification_status.is_optimistic()
&& block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot
{
let new_head_root = fork_choice
.get_head(current_slot, &self.spec)
.map_err(BeaconChainError::from)?;

if new_head_root == block_root {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
signed_block.clone(),
proto_block,
&state,
&self.spec,
) {
match fork_choice.get_head(current_slot, &self.spec) {
// This block became the head, add it to the early attester cache.
Ok(new_head_root) if new_head_root == block_root => {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
signed_block.clone(),
proto_block,
&state,
&self.spec,
) {
warn!(
self.log,
"Early attester cache insert failed";
"error" => ?e
);
}
} else {
warn!(
self.log,
"Early attester cache insert failed";
"error" => ?e
"Early attester block missing";
"block_root" => ?block_root
);
}
} else {
warn!(
self.log,
"Early attester block missing";
"block_root" => ?block_root
);
}
// This block did not become the head, nothing to do.
Ok(_) => (),
Err(e) => error!(
self.log,
"Failed to compute head during block import";
"error" => ?e
),
}
}

Expand Down Expand Up @@ -3608,16 +3614,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Run fork choice since it's possible that the payload invalidation might result in a new
// head.
//
// Don't return early though, since invalidating the justified checkpoint might cause an
// error here.
if let Err(e) = self.recompute_head_at_current_slot().await {
crit!(
self.log,
"Failed to run fork choice routine";
"error" => ?e,
);
}
self.recompute_head_at_current_slot().await;

// Obtain the justified root from fork choice.
//
Expand Down Expand Up @@ -4262,14 +4259,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// Run fork choice and signal to any waiting task that it has completed.
if let Err(e) = self.recompute_head_at_current_slot().await {
error!(
self.log,
"Fork choice error at slot start";
"error" => ?e,
"slot" => slot,
);
}
self.recompute_head_at_current_slot().await;

// Send the notification regardless of fork choice success, this is a "best effort"
// notification and we don't want block production to hit the timeout in case of error.
Expand Down
46 changes: 34 additions & 12 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Execute the fork choice algorithm and enthrone the result as the canonical head.
///
/// This method replaces the old `BeaconChain::fork_choice` method.
pub async fn recompute_head_at_current_slot(self: &Arc<Self>) -> Result<(), Error> {
let current_slot = self.slot()?;
self.recompute_head_at_slot(current_slot).await
pub async fn recompute_head_at_current_slot(self: &Arc<Self>) {
match self.slot() {
Ok(current_slot) => self.recompute_head_at_slot(current_slot).await,
Err(e) => error!(
self.log,
"No slot when recomputing head";
"error" => ?e
),
}
}

/// Execute the fork choice algorithm and enthrone the result as the canonical head.
Expand All @@ -445,7 +451,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// different slot to the wall-clock can be useful for pushing fork choice into the next slot
/// *just* before the start of the slot. This ensures that block production can use the correct
/// head value without being delayed.
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) -> Result<(), Error> {
///
/// This function purposefully does *not* return a `Result`. It's possible for fork choice to
/// fail to update if there is only one viable head and it has an invalid execution payload. In
/// such a case it's critical that the `BeaconChain` keeps importing blocks so that the
/// situation can be rectified. We avoid returning an error here so that calling functions
/// can't abort block import because an error is returned here.
pub async fn recompute_head_at_slot(self: &Arc<Self>, current_slot: Slot) {
metrics::inc_counter(&metrics::FORK_CHOICE_REQUESTS);
let _timer = metrics::start_timer(&metrics::FORK_CHOICE_TIMES);

Expand All @@ -455,23 +467,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
move || chain.recompute_head_at_slot_internal(current_slot),
"recompute_head_internal",
)
.await?
.await
{
// Fork choice returned successfully and did not need to update the EL.
Ok(None) => Ok(()),
Ok(Ok(None)) => (),
// Fork choice returned successfully and needed to update the EL. It has returned a
// join-handle from when it spawned some async tasks. We should await those tasks.
Ok(Some(join_handle)) => match join_handle.await {
Ok(Ok(Some(join_handle))) => match join_handle.await {
// The async task completed successfully.
Ok(Some(())) => Ok(()),
Ok(Some(())) => (),
// The async task did not complete successfully since the runtime is shutting down.
Ok(None) => {
debug!(
self.log,
"Did not update EL fork choice";
"info" => "shutting down"
);
Err(Error::RuntimeShutdown)
}
// The async task did not complete successfully, tokio returned an error.
Err(e) => {
Expand All @@ -480,13 +491,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Did not update EL fork choice";
"error" => ?e
);
Err(Error::TokioJoin(e))
}
},
// There was an error recomputing the head.
Err(e) => {
Ok(Err(e)) => {
metrics::inc_counter(&metrics::FORK_CHOICE_ERRORS);
Err(e)
error!(
self.log,
"Error whist recomputing head";
"error" => ?e
);
}
// There was an error spawning the task.
Err(e) => {
error!(
self.log,
"Failed to spawn recompute head task";
"error" => ?e
);
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions beacon_node/beacon_chain/src/state_advance_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,7 @@ async fn state_advance_timer<T: BeaconChainTypes>(
return;
}

if let Err(e) = beacon_chain.recompute_head_at_slot(next_slot).await {
warn!(
log,
"Error updating fork choice for next slot";
"error" => ?e,
"slot" => next_slot,
);
}
beacon_chain.recompute_head_at_slot(next_slot).await;

// Use a blocking task to avoid blocking the core executor whilst waiting for locks
// in `ForkChoiceSignalTx`.
Expand Down
18 changes: 16 additions & 2 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ impl<E: EthSpec> Builder<EphemeralHarnessType<E>> {
self.store = Some(store);
self.store_mutator(Box::new(mutator))
}

/// Manually restore from a given `MemoryStore`.
pub fn resumed_ephemeral_store(
mut self,
store: Arc<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>>,
) -> Self {
let mutator = move |builder: BeaconChainBuilder<_>| {
builder
.resume_from_db()
.expect("should resume from database")
};
self.store = Some(store);
self.store_mutator(Box::new(mutator))
}
}

impl<E: EthSpec> Builder<DiskHarnessType<E>> {
Expand Down Expand Up @@ -1376,7 +1390,7 @@ where
.process_block(Arc::new(block), CountUnrealized::True)
.await?
.into();
self.chain.recompute_head_at_current_slot().await?;
self.chain.recompute_head_at_current_slot().await;
Ok(block_hash)
}

Expand All @@ -1389,7 +1403,7 @@ where
.process_block(Arc::new(block), CountUnrealized::True)
.await?
.into();
self.chain.recompute_head_at_current_slot().await?;
self.chain.recompute_head_at_current_slot().await;
Ok(block_hash)
}

Expand Down
18 changes: 3 additions & 15 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ async fn chain_segment_full_segment() {
.into_block_error()
.expect("should import chain segment");

harness
.chain
.recompute_head_at_current_slot()
.await
.expect("should run fork choice");
harness.chain.recompute_head_at_current_slot().await;

assert_eq!(
harness.head_block_root(),
Expand Down Expand Up @@ -194,11 +190,7 @@ async fn chain_segment_varying_chunk_size() {
.unwrap_or_else(|_| panic!("should import chain segment of len {}", chunk_size));
}

harness
.chain
.recompute_head_at_current_slot()
.await
.expect("should run fork choice");
harness.chain.recompute_head_at_current_slot().await;

assert_eq!(
harness.head_block_root(),
Expand Down Expand Up @@ -729,11 +721,7 @@ async fn block_gossip_verification() {
}

// Recompute the head to ensure we cache the latest view of fork choice.
harness
.chain
.recompute_head_at_current_slot()
.await
.unwrap();
harness.chain.recompute_head_at_current_slot().await;

/*
* This test ensures that:
Expand Down
Loading