Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix): HTTP error handling in VC API #4606

Closed
wants to merge 29 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5577b03
(fix): HTTP error handling in VC API
protocolwhisper Aug 10, 2023
db932da
Merge branch 'sigp:unstable' into unstable
protocolwhisper Aug 17, 2023
edecf5a
(rebase): HTTP error handling in VC API
protocolwhisper Aug 18, 2023
24fa06b
(rebase) : Not working
protocolwhisper Aug 19, 2023
166f0f5
Revert "(rebase) : Not working"
protocolwhisper Aug 19, 2023
09a0eb6
(rebase) : Http vc api
protocolwhisper Aug 20, 2023
802e7ea
(rebase): block_signed_task , blocking_task
protocolwhisper Aug 21, 2023
a27b18e
(Fix) : Rejection handling in VC API
protocolwhisper Aug 22, 2023
f05c3fb
(rebase) : Fix lint
protocolito Aug 28, 2023
b608d2e
Revert "(rebase) : Fix lint"
protocolito Aug 28, 2023
195b9b3
(rebase): Fix Lint
protocolwhisper Aug 28, 2023
e6fa5ce
Merge branch 'sigp:unstable' into unstable
protocolwhisper Aug 28, 2023
918f2ec
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 3, 2023
669c7ea
(rebase) : Convert Rejection into signed Response
protocolwhisper Oct 4, 2023
66578b6
(rebase) : Improve rejection error handling
protocolwhisper Oct 8, 2023
34d07e5
(rebase): Improving rejection error handling
protocolwhisper Oct 8, 2023
19f7096
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 8, 2023
1516ff7
(rebase): Fixing comments
protocolwhisper Oct 8, 2023
9759a88
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 20, 2023
3271d4a
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 31, 2023
52bdc5b
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 5, 2023
f6671a1
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 9, 2023
f77a5a0
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 21, 2023
8e36053
(rebase) : empty lines
protocolwhisper Dec 17, 2023
6db95ae
Merge branch 'sigp:unstable' into unstable
protocolwhisper Jan 18, 2024
d590201
(rebase) : Changed output to Response<body>
protocolwhisper Jan 18, 2024
dd1149d
(fix): logic & error handling
protocolwhisper Jan 26, 2024
891e718
Run cargo fmt
dapplion Apr 2, 2024
b45c97e
Clean doc
dapplion Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 67 additions & 46 deletions validator_client/src/http_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use eth2::lighthouse_vc::{
std_types::{AuthResponse, GetFeeRecipientResponse, GetGasLimitResponse},
types::{self as api_types, GenericResponse, Graffiti, PublicKey, PublicKeyBytes},
};
use hyper::Body;
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use parking_lot::RwLock;
Expand All @@ -39,10 +40,12 @@ use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::http::response::Response as WarpResponse;
use warp::reply::{Reply, Response};

use warp::{
http::{
header::{HeaderValue, CONTENT_TYPE},
response::Response,
StatusCode,
},
sse::Event,
Expand Down Expand Up @@ -110,6 +113,24 @@ impl Default for Config {
}
}

/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection<T: Reply>(res: Result<T, warp::Rejection>) -> Response {
match res {
Ok(response) => response.into_response(),
Err(e) => match warp_utils::reject::handle_rejection(e).await {
Ok(reply) => reply.into_response(),
Err(_) => warp::reply::with_status(
warp::reply::json(&"unhandled error"),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
},
}
}

/// Creates a server that will serve requests using information from `ctx`.
///
/// The server will shut down gracefully when the `shutdown` future resolves.
Expand Down Expand Up @@ -265,7 +286,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("version"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
.then(|signer| {
blocking_signed_json_task(signer, move || {
Ok(api_types::GenericResponse::from(api_types::VersionData {
version: version_with_platform(),
Expand All @@ -278,7 +299,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("health"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
.then(|signer| {
blocking_signed_json_task(signer, move || {
eth2::lighthouse::Health::observe()
.map(api_types::GenericResponse::from)
Expand All @@ -292,7 +313,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(spec_filter.clone())
.and(signer.clone())
.and_then(|spec: Arc<_>, signer| {
.then(|spec: Arc<_>, signer| {
blocking_signed_json_task(signer, move || {
let config = ConfigAndPreset::from_chain_spec::<E>(&spec, None);
Ok(api_types::GenericResponse::from(config))
Expand All @@ -305,7 +326,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
.then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
let validators = validator_store
.initialized_validators()
Expand All @@ -330,7 +351,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
let validator = validator_store
Expand Down Expand Up @@ -365,7 +386,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(app_start_filter)
.and(validator_dir_filter.clone())
.and(signer.clone())
.and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
.then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
blocking_signed_json_task(signer, move || {
let app_uptime = app_start.elapsed().as_secs();
Ok(api_types::GenericResponse::from(observe_system_health_vc(
Expand All @@ -383,7 +404,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_flag_filter)
.and(signer.clone())
.and(log_filter.clone())
.and_then(
.then(
|validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>,
Expand Down Expand Up @@ -421,7 +442,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(spec_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -468,7 +489,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(spec_filter)
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -517,7 +538,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -603,7 +624,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
Expand Down Expand Up @@ -656,7 +677,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter)
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -726,7 +747,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_auth = get_auth
.and(signer.clone())
.and(api_token_path_filter)
.and_then(|signer, token_path: PathBuf| {
.then(|signer, token_path: PathBuf| {
blocking_signed_json_task(signer, move || {
Ok(AuthResponse {
token_path: token_path.display().to_string(),
Expand All @@ -743,7 +764,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
move |request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
if allow_keystore_export {
Expand All @@ -770,7 +791,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -810,7 +831,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -850,7 +871,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -887,7 +908,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -919,7 +940,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -959,7 +980,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -1000,7 +1021,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(log_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|pubkey: PublicKey,
query: api_types::VoluntaryExitQuery,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -1032,7 +1053,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_std_keystores = std_keystores
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
.then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store)))
});

Expand All @@ -1045,7 +1066,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
move |request,
signer,
validator_dir,
Expand Down Expand Up @@ -1074,7 +1095,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
keystores::delete(request, validator_store, task_executor, log)
})
Expand All @@ -1084,7 +1105,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_std_remotekeys = std_remotekeys
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
.then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store)))
});

Expand All @@ -1095,7 +1116,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::import(request, validator_store, task_executor, log)
})
Expand All @@ -1108,7 +1129,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter)
.and(task_executor_filter)
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::delete(request, validator_store, task_executor, log)
})
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1226,38 +1247,38 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(
signer: S,
func: F,
) -> Result<impl warp::Reply, warp::Rejection>
pub async fn blocking_signed_json_task<S, F, T>(signer: S, func: F) -> Response
where
S: Fn(&[u8]) -> String,
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static,
{
warp_utils::task::blocking_task(func)
let result = warp_utils::task::blocking_task(func)
.await
.map(|func_output| {
let mut response = match serde_json::to_vec(&func_output) {
Ok(body) => {
let mut res = Response::new(body);
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
res
let response_body = match serde_json::to_vec(&func_output) {
Ok(body) => body,
Err(_) => {
return WarpResponse::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(vec![]))
.expect("can produce simple response from static values")
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
}
Err(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(vec![])
.expect("can produce simple response from static values"),
};

let body: &Vec<u8> = response.body();
let signature = signer(body);
let signature = signer(&response_body);
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");

let mut response = WarpResponse::builder()
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
.body(Body::from(response_body))
.unwrap();

response.headers_mut().append("Signature", header_value);

response
})
});

convert_rejection(result).await
}