Skip to content

Commit

Permalink
Simplify error handling after engines fallback removal (#3283)
Browse files Browse the repository at this point in the history
## Issue Addressed
Part of #3118, continuation of #3257

## Proposed Changes
- the [ `first_success_without_retry` ](https://github.com/sigp/lighthouse/blob/9c429d0764ed91cf56efb8a47a35a556b54a86a4/beacon_node/execution_layer/src/engines.rs#L348-L351) function returns a single error.
- the [`first_success`](https://github.com/sigp/lighthouse/blob/9c429d0764ed91cf56efb8a47a35a556b54a86a4/beacon_node/execution_layer/src/engines.rs#L324) function returns a single error.
- [ `EngineErrors` ](https://github.com/sigp/lighthouse/blob/9c429d0764ed91cf56efb8a47a35a556b54a86a4/beacon_node/execution_layer/src/lib.rs#L69) carries a single error.
- [`EngineError`](https://github.com/sigp/lighthouse/blob/9c429d0764ed91cf56efb8a47a35a556b54a86a4/beacon_node/execution_layer/src/engines.rs#L173-L177) now does not need to carry an Id
- [`process_multiple_payload_statuses`](https://github.com/sigp/lighthouse/blob/9c429d0764ed91cf56efb8a47a35a556b54a86a4/beacon_node/execution_layer/src/payload_status.rs#L46-L50) now doesn't need to receive an iterator of statuses and weight in different errors

## Additional Info
This is built on top of #3294
  • Loading branch information
divagant-martian committed Jul 4, 2022
1 parent 61ed5f0 commit 1219da9
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 241 deletions.
72 changes: 21 additions & 51 deletions beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,15 @@ struct PayloadIdCacheKey {

/// An execution engine.
pub struct Engine<T> {
pub id: String,
pub api: HttpJsonRpc<T>,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
}

impl<T> Engine<T> {
/// Creates a new, offline engine.
pub fn new(id: String, api: HttpJsonRpc<T>) -> Self {
pub fn new(api: HttpJsonRpc<T>) -> Self {
Self {
id,
api,
payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)),
state: RwLock::new(EngineState::Offline),
Expand Down Expand Up @@ -135,10 +133,10 @@ pub struct Engines {

#[derive(Debug)]
pub enum EngineError {
Offline { id: String },
Api { id: String, error: EngineApiError },
Offline,
Api { error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth { id: String },
Auth,
}

impl Engines {
Expand All @@ -159,7 +157,6 @@ impl Engines {
self.log,
"No need to call forkchoiceUpdated";
"msg" => "head does not have execution enabled",
"id" => &self.engine.id,
);
return;
}
Expand All @@ -168,7 +165,6 @@ impl Engines {
self.log,
"Issuing forkchoiceUpdated";
"forkchoice_state" => ?forkchoice_state,
"id" => &self.engine.id,
);

// For simplicity, payload attributes are never included in this call. It may be
Expand All @@ -183,14 +179,12 @@ impl Engines {
self.log,
"Failed to issue latest head to engine";
"error" => ?e,
"id" => &self.engine.id,
);
}
} else {
debug!(
self.log,
"No head, not sending to engine";
"id" => &self.engine.id,
);
}
}
Expand Down Expand Up @@ -261,45 +255,36 @@ impl Engines {
}
}

/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
/// Run `func` on the node.
///
/// This function might try to run `func` twice. If all nodes return an error on the first time
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, Vec<EngineError>>
/// This function might try to run `func` twice. If the node returns an error it will try to
/// upcheck it and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(mut first_errors) => {
// Try to recover some nodes.
Err(e) => {
debug!(self.log, "First engine call failed. Retrying"; "err" => ?e);
// Try to recover the node.
self.upcheck_not_synced(Logging::Enabled).await;
// Retry the call on all nodes.
match self.first_success_without_retry(func).await {
Ok(result) => Ok(result),
Err(second_errors) => {
first_errors.extend(second_errors);
Err(first_errors)
}
}
// Try again.
self.first_success_without_retry(func).await
}
}
}

/// Run `func` on all engines, in the order in which they are defined, returning the first
/// successful result that is found.
/// Run `func` on the node.
pub async fn first_success_without_retry<'a, F, G, H>(
&'a self,
func: F,
) -> Result<H, Vec<EngineError>>
) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let mut errors = vec![];

let (engine_synced, engine_auth_failed) = {
let state = self.engine.state.read().await;
(
Expand All @@ -309,32 +294,22 @@ impl Engines {
};
if engine_synced {
match func(&self.engine).await {
Ok(result) => return Ok(result),
Ok(result) => Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
"id" => &&self.engine.id
);
*self.engine.state.write().await = EngineState::Offline;
errors.push(EngineError::Api {
id: self.engine.id.clone(),
error,
})
Err(EngineError::Api { error })
}
}
} else if engine_auth_failed {
errors.push(EngineError::Auth {
id: self.engine.id.clone(),
})
Err(EngineError::Auth)
} else {
errors.push(EngineError::Offline {
id: self.engine.id.clone(),
})
Err(EngineError::Offline)
}

Err(errors)
}

/// Runs `func` on the node.
Expand Down Expand Up @@ -363,9 +338,7 @@ impl Engines {
{
let func = &func;
if *self.engine.state.read().await == EngineState::Offline {
Err(EngineError::Offline {
id: self.engine.id.clone(),
})
Err(EngineError::Offline)
} else {
match func(&self.engine).await {
Ok(res) => Ok(res),
Expand All @@ -376,10 +349,7 @@ impl Engines {
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
Err(EngineError::Api {
id: self.engine.id.clone(),
error,
})
Err(EngineError::Api { error })
}
}
}
Expand Down
62 changes: 25 additions & 37 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc};
pub use engines::ForkChoiceState;
use engines::{Engine, EngineError, Engines, Logging};
use lru::LruCache;
use payload_status::process_multiple_payload_statuses;
use payload_status::process_payload_status;
pub use payload_status::PayloadStatus;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -68,11 +68,10 @@ pub enum Error {
NoPayloadBuilder,
ApiError(ApiError),
Builder(builder_client::Error),
EngineErrors(Vec<EngineError>),
EngineError(Box<EngineError>),
NotSynced,
ShuttingDown,
FeeRecipientUnspecified,
ConsensusFailure,
MissingLatestValidHash,
InvalidJWTSecret(String),
}
Expand Down Expand Up @@ -200,12 +199,11 @@ impl<T: EthSpec> ExecutionLayer<T> {
}?;

let engine: Engine<EngineApi> = {
let id = execution_url.to_string();
let auth = Auth::new(jwt_key, jwt_id, jwt_version);
debug!(log, "Loaded execution endpoint"; "endpoint" => %id, "jwt_path" => ?secret_file.as_path());
debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path());
let api = HttpJsonRpc::<EngineApi>::new_with_auth(execution_url, auth)
.map_err(Error::ApiError)?;
Engine::<EngineApi>::new(id, api)
Engine::<EngineApi>::new(api)
};

let builder = builder_url
Expand Down Expand Up @@ -709,7 +707,8 @@ impl<T: EthSpec> ExecutionLayer<T> {
})
})
.await
.map_err(Error::EngineErrors)
.map_err(Box::new)
.map_err(Error::EngineError)
}

/// Maps to the `engine_newPayload` JSON-RPC call.
Expand Down Expand Up @@ -742,16 +741,14 @@ impl<T: EthSpec> ExecutionLayer<T> {
"block_number" => execution_payload.block_number,
);

let broadcast_results = self
let broadcast_result = self
.engines()
.broadcast(|engine| engine.api.new_payload_v1(execution_payload.clone()))
.await;

process_multiple_payload_statuses(
execution_payload.block_hash,
Some(broadcast_results).into_iter(),
self.log(),
)
process_payload_status(execution_payload.block_hash, broadcast_result, self.log())
.map_err(Box::new)
.map_err(Error::EngineError)
}

/// Register that the given `validator_index` is going to produce a block at `slot`.
Expand Down Expand Up @@ -879,7 +876,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
.set_latest_forkchoice_state(forkchoice_state)
.await;

let broadcast_results = self
let broadcast_result = self
.engines()
.broadcast(|engine| async move {
engine
Expand All @@ -888,13 +885,13 @@ impl<T: EthSpec> ExecutionLayer<T> {
})
.await;

process_multiple_payload_statuses(
process_payload_status(
head_block_hash,
Some(broadcast_results)
.into_iter()
.map(|result| result.map(|response| response.payload_status)),
broadcast_result.map(|response| response.payload_status),
self.log(),
)
.map_err(Box::new)
.map_err(Error::EngineError)
}

pub async fn exchange_transition_configuration(&self, spec: &ChainSpec) -> Result<(), Error> {
Expand All @@ -909,9 +906,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
.broadcast(|engine| engine.api.exchange_transition_configuration_v1(local))
.await;

let mut errors = vec![];
// Having no fallbacks, the id of the used node is 0
let i = 0usize;
match broadcast_result {
Ok(remote) => {
if local.terminal_total_difficulty != remote.terminal_total_difficulty
Expand All @@ -922,38 +916,29 @@ impl<T: EthSpec> ExecutionLayer<T> {
"Execution client config mismatch";
"msg" => "ensure lighthouse and the execution client are up-to-date and \
configured consistently",
"execution_endpoint" => i,
"remote" => ?remote,
"local" => ?local,
);
errors.push(EngineError::Api {
id: i.to_string(),
Err(Error::EngineError(Box::new(EngineError::Api {
error: ApiError::TransitionConfigurationMismatch,
});
})))
} else {
debug!(
self.log(),
"Execution client config is OK";
"execution_endpoint" => i
);
Ok(())
}
}
Err(e) => {
error!(
self.log(),
"Unable to get transition config";
"error" => ?e,
"execution_endpoint" => i,
);
errors.push(e);
Err(Error::EngineError(Box::new(e)))
}
}

if errors.is_empty() {
Ok(())
} else {
Err(Error::EngineErrors(errors))
}
}

/// Used during block production to determine if the merge has been triggered.
Expand Down Expand Up @@ -992,7 +977,8 @@ impl<T: EthSpec> ExecutionLayer<T> {
.await
})
.await
.map_err(Error::EngineErrors)?;
.map_err(Box::new)
.map_err(Error::EngineError)?;

if let Some(hash) = &hash_opt {
info!(
Expand Down Expand Up @@ -1102,7 +1088,8 @@ impl<T: EthSpec> ExecutionLayer<T> {
Ok(None)
})
.await
.map_err(|e| Error::EngineErrors(vec![e]))
.map_err(Box::new)
.map_err(Error::EngineError)
}

/// This function should remain internal.
Expand Down Expand Up @@ -1160,7 +1147,8 @@ impl<T: EthSpec> ExecutionLayer<T> {
.await
})
.await
.map_err(Error::EngineErrors)
.map_err(Box::new)
.map_err(Error::EngineError)
}

async fn get_payload_by_block_hash_from_engine(
Expand Down
Loading

0 comments on commit 1219da9

Please sign in to comment.