Skip to content

Commit

Permalink
Start merging VC to new API
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Sep 10, 2020
1 parent 002228d commit 1bbabae
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 72 deletions.
12 changes: 10 additions & 2 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
pub mod types;

use self::types::*;
use reqwest::{IntoUrl, Response, StatusCode};
use reqwest::{IntoUrl, Response};
use serde::{de::DeserializeOwned, Serialize};
use std::convert::TryFrom;
use std::fmt;

pub use reqwest;
pub use reqwest::Url;
pub use reqwest::{StatusCode, Url};

#[derive(Debug)]
pub enum Error {
Expand All @@ -25,6 +26,13 @@ impl Error {
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(Clone)]
pub struct BeaconNodeClient {
client: reqwest::Client,
server: Url,
Expand Down
2 changes: 1 addition & 1 deletion validator_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ eth2_ssz_derive = "0.1.0"
hex = "0.4.2"
deposit_contract = { path = "../common/deposit_contract" }
bls = { path = "../crypto/bls" }
remote_beacon_node = { path = "../common/remote_beacon_node" }
eth2 = { path = "../common/eth2" }
tempdir = "0.3.7"
rayon = "1.3.0"
validator_dir = { path = "../common/validator_dir" }
Expand Down
22 changes: 14 additions & 8 deletions validator_client/src/fork_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use environment::RuntimeContext;
use eth2::{types::StateId, BeaconNodeClient};
use futures::StreamExt;
use parking_lot::RwLock;
use remote_beacon_node::RemoteBeaconNode;
use slog::{debug, trace};
use slot_clock::SlotClock;
use std::ops::Deref;
Expand All @@ -16,7 +16,7 @@ const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80);
pub struct ForkServiceBuilder<T, E: EthSpec> {
fork: Option<Fork>,
slot_clock: Option<T>,
beacon_node: Option<RemoteBeaconNode<E>>,
beacon_node: Option<BeaconNodeClient>,
context: Option<RuntimeContext<E>>,
}

Expand All @@ -35,7 +35,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
self
}

pub fn beacon_node(mut self, beacon_node: RemoteBeaconNode<E>) -> Self {
pub fn beacon_node(mut self, beacon_node: BeaconNodeClient) -> Self {
self.beacon_node = Some(beacon_node);
self
}
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkServiceBuilder<T, E> {
/// Helper to minimise `Arc` usage.
pub struct Inner<T, E: EthSpec> {
fork: RwLock<Option<Fork>>,
beacon_node: RemoteBeaconNode<E>,
beacon_node: BeaconNodeClient,
context: RuntimeContext<E>,
slot_clock: T,
}
Expand Down Expand Up @@ -141,17 +141,23 @@ impl<T: SlotClock + 'static, E: EthSpec> ForkService<T, E> {
let fork = self
.inner
.beacon_node
.http
.beacon()
.get_fork()
.get_beacon_states_fork(StateId::Head)
.await
.map_err(|e| {
trace!(
log,
"Fork update failed";
"error" => format!("Error retrieving fork: {:?}", e)
)
})?;
})?
.ok_or_else(|| {
trace!(
log,
"Fork update failed";
"error" => "The beacon head fork is unknown"
)
})?
.data;

if self.fork.read().as_ref() != Some(&fork) {
*(self.fork.write()) = Some(fork);
Expand Down
131 changes: 70 additions & 61 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ use block_service::{BlockService, BlockServiceBuilder};
use clap::ArgMatches;
use duties_service::{DutiesService, DutiesServiceBuilder};
use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, BeaconNodeClient, StatusCode, Url};
use eth2_config::Eth2Config;
use fork_service::{ForkService, ForkServiceBuilder};
use futures::channel::mpsc;
use initialized_validators::InitializedValidators;
use notifier::spawn_notifier;
use remote_beacon_node::RemoteBeaconNode;
use slog::{error, info, Logger};
use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{delay_for, Duration};
use types::{EthSpec, Hash256};
use types::{EthSpec, Hash256, YamlConfig};
use validator_store::ValidatorStore;

/// The interval between attempts to contact the beacon node during startup.
Expand Down Expand Up @@ -104,33 +104,37 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
"enabled" => validators.num_enabled(),
);

let beacon_node_url: Url = config
.http_server
.parse()
.map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?;
let beacon_node_http_client = ClientBuilder::new()
.timeout(HTTP_TIMEOUT)
.build()
.map_err(|e| format!("Unable to build HTTP client: {:?}", e))?;
let beacon_node =
RemoteBeaconNode::new_with_timeout(config.http_server.clone(), HTTP_TIMEOUT)
.map_err(|e| format!("Unable to init beacon node http client: {}", e))?;
BeaconNodeClient::from_components(beacon_node_url, beacon_node_http_client)
.map_err(|_| format!("Beacon node URL is invalid"))?;

// Perform some potentially long-running initialization tasks.
let (eth2_config, genesis_time, genesis_validators_root) = tokio::select! {
let (yaml_config, genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_node, &context) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};

// Do not permit a connection to a beacon node using different spec constants.
if context.eth2_config.spec_constants != eth2_config.spec_constants {
return Err(format!(
"Beacon node is using an incompatible spec. Got {}, expected {}",
eth2_config.spec_constants, context.eth2_config.spec_constants
));
let mut beacon_node_spec = yaml_config.apply_to_chain_spec(&T::default_spec())
.ok_or_else(|| format!(
"The minimal/mainnet spec type of the beacon node does not match the validator client. \
See the --testnet command."
))?;

if context.eth2_config.spec != beacon_node_spec {
return Err(
"The beacon node is using a different spec version to this validator client. \
See the --testnet command."
.to_string(),
);
}

// Note: here we just assume the spec variables of the remote node. This is very useful
// for testnets, but perhaps a security issue when it comes to mainnet.
//
// A damaging attack would be for a beacon node to convince the validator client of a
// different `SLOTS_PER_EPOCH` variable. This could result in slashable messages being
// produced. We are safe from this because `SLOTS_PER_EPOCH` is a type-level constant
// for Lighthouse.
context.eth2_config = eth2_config;

let slot_clock = SystemTimeSlotClock::new(
context.eth2_config.spec.genesis_slot,
Duration::from_secs(genesis_time),
Expand Down Expand Up @@ -228,80 +232,85 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
}

async fn init_from_beacon_node<E: EthSpec>(
beacon_node: &RemoteBeaconNode<E>,
beacon_node: &BeaconNodeClient,
context: &RuntimeContext<E>,
) -> Result<(Eth2Config, u64, Hash256), String> {
) -> Result<(YamlConfig, u64, Hash256), String> {
// Wait for the beacon node to come online.
wait_for_node(beacon_node, context.log()).await?;

let eth2_config = beacon_node
.http
.spec()
.get_eth2_config()
.await
.map_err(|e| format!("Unable to read eth2 config from beacon node: {:?}", e))?;
let genesis_time = beacon_node
.http
.beacon()
.get_genesis_time()
let yaml_config = beacon_node
.get_config_spec()
.await
.map_err(|e| format!("Unable to read genesis time from beacon node: {:?}", e))?;
.map_err(|e| format!("Unable to read spec from beacon node: {:?}", e))?
.data;

let genesis = loop {
match beacon_node.get_beacon_genesis().await {
Ok(genesis) => break genesis.data,
Err(e) => {
// A 404 error on the genesis endpoint indicates that genesis has not yet occurred.
if e.status() == Some(StatusCode::NOT_FOUND) {
info!(
context.log(),
"Waiting for genesis";
);
} else {
error!(
context.log(),
"Error polling beacon node";
"error" => format!("{:?}", e)
);
}
}
}

delay_for(RETRY_DELAY).await;
};

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("Unable to read system time: {:?}", e))?;
let genesis = Duration::from_secs(genesis_time);
let genesis_time = Duration::from_secs(genesis.genesis_time);

// If the time now is less than (prior to) genesis, then delay until the
// genesis instant.
//
// If the validator client starts before genesis, it will get errors from
// the slot clock.
if now < genesis {
if now < genesis_time {
info!(
context.log(),
"Starting node prior to genesis";
"seconds_to_wait" => (genesis - now).as_secs()
"seconds_to_wait" => (genesis_time - now).as_secs()
);

delay_for(genesis - now).await;
delay_for(genesis_time - now).await;
} else {
info!(
context.log(),
"Genesis has already occurred";
"seconds_ago" => (now - genesis).as_secs()
"seconds_ago" => (now - genesis_time).as_secs()
);
}
let genesis_validators_root = beacon_node
.http
.beacon()
.get_genesis_validators_root()
.await
.map_err(|e| {
format!(
"Unable to read genesis validators root from beacon node: {:?}",
e
)
})?;

Ok((eth2_config, genesis_time, genesis_validators_root))

Ok((
yaml_config,
genesis.genesis_time,
genesis.genesis_validators_root,
))
}

/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn wait_for_node<E: EthSpec>(
beacon_node: &RemoteBeaconNode<E>,
log: &Logger,
) -> Result<(), String> {
async fn wait_for_node(beacon_node: &BeaconNodeClient, log: &Logger) -> Result<(), String> {
// Try to get the version string from the node, looping until success is returned.
loop {
let log = log.clone();
let result = beacon_node
.clone()
.http
.node()
.get_version()
.get_node_version()
.await
.map_err(|e| format!("{:?}", e));
.map_err(|e| format!("{:?}", e))
.map(|body| body.data.version);

match result {
Ok(version) => {
Expand Down

0 comments on commit 1bbabae

Please sign in to comment.