Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the --beacon-nodes list to be updated at runtime #6551

Open
wants to merge 7 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ pub const CONSENSUS_BLOCK_VALUE_HEADER: &str = "Eth-Consensus-Block-Value";
pub const CONTENT_TYPE_HEADER: &str = "Content-Type";
pub const SSZ_CONTENT_TYPE_HEADER: &str = "application/octet-stream";

/// Specific optimized timeout constants for HTTP requests involved in different validator duties.
/// This can help ensure that proper endpoint fallback occurs.
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;

#[derive(Debug)]
pub enum Error {
/// The `reqwest` client raised an error.
Expand Down Expand Up @@ -151,6 +166,25 @@ impl Timeouts {
get_validator_block: timeout,
}
}

pub fn use_optimized_timeouts(base_timeout: Duration) -> Self {
Timeouts {
attestation: base_timeout / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
attester_duties: base_timeout / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
attestation_subscriptions: base_timeout
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
liveness: base_timeout / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: base_timeout / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: base_timeout / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
sync_committee_contribution: base_timeout
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
sync_duties: base_timeout / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
get_beacon_blocks_ssz: base_timeout / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
get_validator_block: base_timeout / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
}
}
}

/// A wrapper around `reqwest::Client` which provides convenience methods for interfacing with a
Expand Down
10 changes: 10 additions & 0 deletions common/eth2/src/lighthouse_vc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,13 @@ pub struct SingleExportKeystoresResponse {
pub struct SetGraffitiRequest {
pub graffiti: GraffitiString,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct UpdateCandidatesRequest {
pub beacon_nodes: Vec<String>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct UpdateCandidatesResponse {
pub new_beacon_nodes_list: Vec<String>,
}
1 change: 1 addition & 0 deletions validator_client/beacon_node_fallback/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ strum = { workspace = true }
tokio = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
sensitive_url = { workspace = true }
39 changes: 38 additions & 1 deletion validator_client/beacon_node_fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use beacon_node_health::{
IsOptimistic, SyncDistanceTier,
};
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use eth2::{BeaconNodeHttpClient, Timeouts};
use futures::future;
use sensitive_url::SensitiveUrl;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -461,6 +462,42 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
(candidate_info, num_available, num_synced)
}

/// Update the list of candidates with a new list.
/// Returns `Ok(new_list)` if the update was successful.
/// Returns `Err(some_err)` if the list is empty.
pub async fn update_candidates_list(
&self,
new_list: Vec<SensitiveUrl>,
use_long_timeouts: bool,
) -> Result<Vec<SensitiveUrl>, String> {
if new_list.is_empty() {
return Err("list cannot be empty".to_string());
}

let timeouts: Timeouts = if new_list.len() == 1 || use_long_timeouts {
Timeouts::set_all(Duration::from_secs(self.spec.seconds_per_slot))
} else {
Timeouts::use_optimized_timeouts(Duration::from_secs(self.spec.seconds_per_slot))
};

let new_candidates: Vec<CandidateBeaconNode<E>> = new_list
.clone()
.into_iter()
.enumerate()
.map(|(index, url)| {
CandidateBeaconNode::<E>::new(
BeaconNodeHttpClient::new(url, timeouts.clone()),
index,
)
})
.collect();

let mut candidates = self.candidates.write().await;
*candidates = new_candidates;

Ok(new_list)
}

/// Loop through ALL candidates in `self.candidates` and update their sync status.
///
/// It is possible for a node to return an unsynced status while continuing to serve
Expand Down
62 changes: 60 additions & 2 deletions validator_client/http_api/src/lib.rs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that adding or removing proposer nodes is currently not possible.
The main reason I didn't include it is because if no proposer nodes are included when the VC is initialized, the proposer_node field in BlockService is set to None. So any future attempts to add proposer nodes will require the entire BlockService to be updated.

I suspect we need to rework the way proposer nodes work anyway. I might look into that for a separate PR.

Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ use account_utils::{
};
pub use api_secret::ApiSecret;
use beacon_node_fallback::CandidateInfo;
use core::convert::Infallible;
use create_validator::{
create_validators_mnemonic, create_validators_web3signer, get_voting_password_storage,
};
use eth2::lighthouse_vc::{
std_types::{AuthResponse, GetFeeRecipientResponse, GetGasLimitResponse},
types::{
self as api_types, GenericResponse, GetGraffitiResponse, Graffiti, PublicKey,
PublicKeyBytes, SetGraffitiRequest,
PublicKeyBytes, SetGraffitiRequest, UpdateCandidatesRequest, UpdateCandidatesResponse,
},
};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
use parking_lot::RwLock;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use slog::{crit, info, warn, Logger};
use slot_clock::SlotClock;
Expand All @@ -49,7 +51,8 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use validator_services::block_service::BlockService;
use warp::{sse::Event, Filter};
use warp::{reply::Response, sse::Event, Filter};
use warp_utils::reject::convert_rejection;
use warp_utils::task::blocking_json_task;

#[derive(Debug)]
Expand Down Expand Up @@ -99,6 +102,7 @@ pub struct Config {
pub allow_origin: Option<String>,
pub allow_keystore_export: bool,
pub store_passwords_in_secrets_dir: bool,
pub bn_long_timeouts: bool,
}

impl Default for Config {
Expand All @@ -110,6 +114,7 @@ impl Default for Config {
allow_origin: None,
allow_keystore_export: false,
store_passwords_in_secrets_dir: false,
bn_long_timeouts: false,
}
}
}
Expand All @@ -136,6 +141,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let config = &ctx.config;
let allow_keystore_export = config.allow_keystore_export;
let store_passwords_in_secrets_dir = config.store_passwords_in_secrets_dir;
let use_long_timeouts = config.bn_long_timeouts;
let log = ctx.log.clone();

// Configure CORS.
Expand Down Expand Up @@ -835,6 +841,57 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
});

// POST /lighthouse/update_beacon_nodes
let post_lighthouse_update_beacon_nodes = warp::path("lighthouse")
.and(warp::path("update_beacon_nodes"))
.and(warp::path::end())
.and(warp::body::json())
.and(block_service_filter.clone())
.then(
move |request: UpdateCandidatesRequest, block_service: BlockService<T, E>| async move {
async fn parse_urls(urls: &[String]) -> Result<Vec<SensitiveUrl>, Response> {
match urls
.iter()
.map(|url| SensitiveUrl::parse(url).map_err(|e| e.to_string()))
.collect()
{
Ok(sensitive_urls) => Ok(sensitive_urls),
Err(_) => Err(convert_rejection::<Infallible>(Err(
warp_utils::reject::custom_bad_request(
"one or more urls could not be parsed".to_string(),
),
))
.await),
}
}

let beacons: Vec<SensitiveUrl> = match parse_urls(&request.beacon_nodes).await {
Ok(new_beacons) => {
match block_service
.beacon_nodes
.update_candidates_list(new_beacons, use_long_timeouts)
.await
{
Ok(beacons) => beacons,
Err(e) => {
return convert_rejection::<Infallible>(Err(
warp_utils::reject::custom_bad_request(e.to_string()),
))
.await
}
}
}
Err(e) => return e,
};

let response: UpdateCandidatesResponse = UpdateCandidatesResponse {
new_beacon_nodes_list: beacons.iter().map(|surl| surl.to_string()).collect(),
};

blocking_json_task(move || Ok(api_types::GenericResponse::from(response))).await
},
);

// Standard key-manager endpoints.
let eth_v1 = warp::path("eth").and(warp::path("v1"));
let std_keystores = eth_v1.and(warp::path("keystores")).and(warp::path::end());
Expand Down Expand Up @@ -1322,6 +1379,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.or(post_std_keystores)
.or(post_std_remotekeys)
.or(post_graffiti)
.or(post_lighthouse_update_beacon_nodes)
.recover(warp_utils::reject::handle_rejection),
))
.or(warp::patch()
Expand Down
1 change: 1 addition & 0 deletions validator_client/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl ApiTester {
allow_origin: None,
allow_keystore_export: true,
store_passwords_in_secrets_dir: false,
bn_long_timeouts: false,
}
}

Expand Down
33 changes: 1 addition & 32 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,6 @@ const RETRY_DELAY: Duration = Duration::from_secs(2);
/// The time between polls when waiting for genesis.
const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);

/// Specific timeout constants for HTTP requests involved in different validator duties.
/// This can help ensure that proper endpoint fallback occurs.
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;

const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";

#[derive(Clone)]
Expand Down Expand Up @@ -307,23 +292,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
log,
"Fallback endpoints are available, using optimized timeouts.";
);
Timeouts {
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
attestation_subscriptions: slot_duration
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
sync_committee_contribution: slot_duration
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
get_beacon_blocks_ssz: slot_duration
/ HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
get_deposit_snapshot: slot_duration / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
get_validator_block: slot_duration / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
}
Timeouts::use_optimized_timeouts(slot_duration)
} else {
Timeouts::set_all(slot_duration)
};
Expand Down
Loading