Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Implement el_offline and use it in the VC #4295

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,9 @@ async fn invalid_after_optimistic_sync() {
.await,
);

// EL status should still be online, no errors.
assert!(!rig.execution_layer().is_offline_or_erroring().await);

// Running fork choice is necessary since a block has been invalidated.
rig.recompute_head().await;

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ impl Engine {
**self.state.read().await == EngineStateInternal::Synced
}

/// Returns `true` if the engine has a status other than synced or syncing.
pub async fn is_offline(&self) -> bool {
EngineState::from(**self.state.read().await) == EngineState::Offline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have thoughts on catching EngineState::AuthFailed as well? I think the newPayload failures would catch an auth failure in practice. I don't feel strongly either way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's accurate. If the auth is wrong, then the EL is effectively offline.

I imagine this would only be an issue during initial setup, or if resyncing the EE and deleting the JWT secret.

}

/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck(&self) {
Expand Down
45 changes: 21 additions & 24 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ struct Inner<E: EthSpec> {
builder_profit_threshold: Uint256,
log: Logger,
always_prefer_builder_payload: bool,
/// Track whether the last `newPayload` call errored.
///
/// This is used *only* in the informational sync status endpoint, so that a VC using this
/// node can prefer another node with a healthier EL.
last_new_payload_errored: RwLock<bool>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had originally expected that we'd track the last fcU error rather than the last newPayload error.

My reasoning was that it's the last call we'd do in the block import process so it has the most up-to-date information about EE state. However I see now that if we fail a newPayload then we won't end up calling fcU and we'd end up kinda stuck.

If we're choosing just one, then I now prefer newPayload. No changes suggested, just sharing my reasoning.

}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -350,6 +355,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
builder_profit_threshold: Uint256::from(builder_profit_threshold),
log,
always_prefer_builder_payload,
last_new_payload_errored: RwLock::new(false),
};

Ok(Self {
Expand Down Expand Up @@ -542,6 +548,15 @@ impl<T: EthSpec> ExecutionLayer<T> {
synced
}

/// Return `true` if the execution layer is offline or returning errors on `newPayload`.
///
/// This function should never be used to prevent any operation in the beacon node, but can
/// be used to give an indication on the HTTP API that the node's execution layer is struggling,
/// which can in turn be used by the VC.
pub async fn is_offline_or_erroring(&self) -> bool {
self.engine().is_offline().await || *self.inner.last_new_payload_errored.read().await
}

/// Updates the proposer preparation data provided by validators
pub async fn update_proposer_preparation(
&self,
Expand Down Expand Up @@ -1116,18 +1131,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
}

/// Maps to the `engine_newPayload` JSON-RPC call.
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment cleanups 👍

/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn notify_new_payload(
&self,
execution_payload: &ExecutionPayload<T>,
Expand Down Expand Up @@ -1156,12 +1159,18 @@ impl<T: EthSpec> ExecutionLayer<T> {
&["new_payload", status.status.into()],
);
}
*self.inner.last_new_payload_errored.write().await = result.is_err();

process_payload_status(execution_payload.block_hash(), result, self.log())
.map_err(Box::new)
.map_err(Error::EngineError)
}

/// Update engine sync status.
pub async fn upcheck(&self) {
self.engine().upcheck().await;
}

/// Register that the given `validator_index` is going to produce a block at `slot`.
///
/// The block will be built atop `head_block_root` and the EL will need to prepare an
Expand Down Expand Up @@ -1221,18 +1230,6 @@ impl<T: EthSpec> ExecutionLayer<T> {
}

/// Maps to the `engine_consensusValidated` JSON-RPC call.
///
/// ## Fallback Behaviour
///
/// The request will be broadcast to all nodes, simultaneously. It will await a response (or
/// failure) from all nodes and then return based on the first of these conditions which
/// returns true:
///
/// - Error::ConsensusFailure if some nodes return valid and some return invalid
/// - Valid, if any nodes return valid.
/// - Invalid, if any nodes return invalid.
/// - Syncing, if any nodes return syncing.
/// - An error, if all nodes return an error.
pub async fn notify_forkchoice_updated(
&self,
head_block_hash: ExecutionBlockHash,
Expand Down
25 changes: 18 additions & 7 deletions beacon_node/execution_layer/src/test_utils/handle_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ pub async fn handle_rpc<T: EthSpec>(
.map_err(|s| (s, GENERIC_ERROR_CODE))?;

match method {
ETH_SYNCING => Ok(JsonValue::Bool(false)),
ETH_SYNCING => ctx
.syncing_response
.lock()
.clone()
.map(JsonValue::Bool)
.map_err(|message| (message, GENERIC_ERROR_CODE)),
ETH_GET_BLOCK_BY_NUMBER => {
let tag = params
.get(0)
Expand Down Expand Up @@ -145,7 +150,9 @@ pub async fn handle_rpc<T: EthSpec>(

// Canned responses set by block hash take priority.
if let Some(status) = ctx.get_new_payload_status(request.block_hash()) {
return Ok(serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap());
return status
.map(|status| serde_json::to_value(JsonPayloadStatusV1::from(status)).unwrap())
.map_err(|message| (message, GENERIC_ERROR_CODE));
}

let (static_response, should_import) =
Expand Down Expand Up @@ -320,11 +327,15 @@ pub async fn handle_rpc<T: EthSpec>(

// Canned responses set by block hash take priority.
if let Some(status) = ctx.get_fcu_payload_status(&head_block_hash) {
let response = JsonForkchoiceUpdatedV1Response {
payload_status: JsonPayloadStatusV1::from(status),
payload_id: None,
};
return Ok(serde_json::to_value(response).unwrap());
return status
.map(|status| {
let response = JsonForkchoiceUpdatedV1Response {
payload_status: JsonPayloadStatusV1::from(status),
payload_id: None,
};
serde_json::to_value(response).unwrap()
})
.map_err(|message| (message, GENERIC_ERROR_CODE));
}

let mut response = ctx
Expand Down
27 changes: 21 additions & 6 deletions beacon_node/execution_layer/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl<T: EthSpec> MockServer<T> {
hook: <_>::default(),
new_payload_statuses: <_>::default(),
fcu_payload_statuses: <_>::default(),
syncing_response: Arc::new(Mutex::new(Ok(false))),
engine_capabilities: Arc::new(RwLock::new(DEFAULT_ENGINE_CAPABILITIES)),
_phantom: PhantomData,
});
Expand Down Expand Up @@ -414,14 +415,25 @@ impl<T: EthSpec> MockServer<T> {
self.ctx
.new_payload_statuses
.lock()
.insert(block_hash, status);
.insert(block_hash, Ok(status));
}

pub fn set_fcu_payload_status(&self, block_hash: ExecutionBlockHash, status: PayloadStatusV1) {
self.ctx
.fcu_payload_statuses
.lock()
.insert(block_hash, status);
.insert(block_hash, Ok(status));
}

pub fn set_new_payload_error(&self, block_hash: ExecutionBlockHash, error: String) {
self.ctx
.new_payload_statuses
.lock()
.insert(block_hash, Err(error));
}

pub fn set_syncing_response(&self, res: Result<bool, String>) {
*self.ctx.syncing_response.lock() = res;
}
}

Expand Down Expand Up @@ -478,8 +490,11 @@ pub struct Context<T: EthSpec> {
//
// This is a more flexible and less stateful alternative to `static_new_payload_response`
// and `preloaded_responses`.
pub new_payload_statuses: Arc<Mutex<HashMap<ExecutionBlockHash, PayloadStatusV1>>>,
pub fcu_payload_statuses: Arc<Mutex<HashMap<ExecutionBlockHash, PayloadStatusV1>>>,
pub new_payload_statuses:
Arc<Mutex<HashMap<ExecutionBlockHash, Result<PayloadStatusV1, String>>>>,
pub fcu_payload_statuses:
Arc<Mutex<HashMap<ExecutionBlockHash, Result<PayloadStatusV1, String>>>>,
pub syncing_response: Arc<Mutex<Result<bool, String>>>,

pub engine_capabilities: Arc<RwLock<EngineCapabilities>>,
pub _phantom: PhantomData<T>,
Expand All @@ -489,14 +504,14 @@ impl<T: EthSpec> Context<T> {
pub fn get_new_payload_status(
&self,
block_hash: &ExecutionBlockHash,
) -> Option<PayloadStatusV1> {
) -> Option<Result<PayloadStatusV1, String>> {
self.new_payload_statuses.lock().get(block_hash).cloned()
}

pub fn get_fcu_payload_status(
&self,
block_hash: &ExecutionBlockHash,
) -> Option<PayloadStatusV1> {
) -> Option<Result<PayloadStatusV1, String>> {
self.fcu_payload_statuses.lock().get(block_hash).cloned()
}
}
Expand Down
48 changes: 30 additions & 18 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2285,28 +2285,40 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and_then(
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let head_slot = chain.canonical_head.cached_head().head_slot();
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error("Unable to read slot clock".into())
})?;
async move {
let el_offline = if let Some(el) = &chain.execution_layer {
el.is_offline_or_erroring().await
} else {
true
};

// Taking advantage of saturating subtraction on slot.
let sync_distance = current_slot - head_slot;
blocking_json_task(move || {
let head_slot = chain.canonical_head.cached_head().head_slot();
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error(
"Unable to read slot clock".into(),
)
})?;

let is_optimistic = chain
.is_optimistic_or_invalid_head()
.map_err(warp_utils::reject::beacon_chain_error)?;
// Taking advantage of saturating subtraction on slot.
let sync_distance = current_slot - head_slot;

let syncing_data = api_types::SyncingData {
is_syncing: network_globals.sync_state.read().is_syncing(),
is_optimistic: Some(is_optimistic),
head_slot,
sync_distance,
};
let is_optimistic = chain
.is_optimistic_or_invalid_head()
.map_err(warp_utils::reject::beacon_chain_error)?;

Ok(api_types::GenericResponse::from(syncing_data))
})
let syncing_data = api_types::SyncingData {
is_syncing: network_globals.sync_state.read().is_syncing(),
is_optimistic: Some(is_optimistic),
el_offline: Some(el_offline),
head_slot,
sync_distance,
};

Ok(api_types::GenericResponse::from(syncing_data))
})
.await
}
},
);

Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

pub mod fork_tests;
pub mod interactive_tests;
pub mod status_tests;
pub mod tests;
Loading