Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lcli command for manual rescue sync #5458

Merged
merged 10 commits into from
Aug 12, 2024
152 changes: 152 additions & 0 deletions lcli/src/http_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use clap::ArgMatches;
use clap_utils::{parse_optional, parse_required};
use environment::Environment;
use eth2::{
types::{BlockId, ChainSpec, ForkName, PublishBlockRequest, SignedBlockContents},
BeaconNodeHttpClient, Error, SensitiveUrl, Timeouts,
};
use eth2_network_config::Eth2NetworkConfig;
use ssz::Encode;
use std::fs;
use std::fs::File;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use types::EthSpec;

const HTTP_TIMEOUT: Duration = Duration::from_secs(3600);
const DEFAULT_CACHE_DIR: &str = "./cache";

pub fn run<T: EthSpec>(
env: Environment<T>,
network_config: Eth2NetworkConfig,
matches: &ArgMatches,
) -> Result<(), String> {
let executor = env.core_context().executor;
executor
.handle()
.ok_or("shutdown in progress")?
.block_on(async move { run_async::<T>(network_config, matches).await })
}

pub async fn run_async<T: EthSpec>(
network_config: Eth2NetworkConfig,
matches: &ArgMatches,
) -> Result<(), String> {
let spec = &network_config.chain_spec::<T>()?;
let source_url: SensitiveUrl = parse_required(matches, "source-url")?;
let target_url: SensitiveUrl = parse_required(matches, "target-url")?;
let start_block: BlockId = parse_required(matches, "start-block")?;
let maybe_common_ancestor_block: Option<BlockId> =
parse_optional(matches, "known–common-ancestor")?;
let cache_dir_path: PathBuf =
parse_optional(matches, "block-cache-dir")?.unwrap_or(DEFAULT_CACHE_DIR.into());

let source = BeaconNodeHttpClient::new(source_url, Timeouts::set_all(HTTP_TIMEOUT));
let target = BeaconNodeHttpClient::new(target_url, Timeouts::set_all(HTTP_TIMEOUT));

if !cache_dir_path.exists() {
fs::create_dir_all(&cache_dir_path)
.map_err(|e| format!("Unable to create block cache dir: {:?}", e))?;
}

// 1. Download blocks back from head, looking for common ancestor.
let mut blocks = vec![];
let mut next_block_id = start_block;
loop {
println!("downloading {next_block_id:?}");

let publish_block_req =
get_block_from_source::<T>(&source, next_block_id, spec, &cache_dir_path).await;
let block = publish_block_req.signed_block();

next_block_id = BlockId::Root(block.parent_root());
blocks.push((block.slot(), publish_block_req));

if let Some(ref common_ancestor_block) = maybe_common_ancestor_block {
if common_ancestor_block == &next_block_id {
println!("reached known common ancestor: {next_block_id:?}");
break;
}
}

let block_exists_in_target = target
.get_beacon_blocks_ssz::<T>(next_block_id, spec)
.await
.unwrap()
.is_some();
if block_exists_in_target {
println!("common ancestor found: {next_block_id:?}");
break;
}
}

// 2. Apply blocks to target.
for (slot, block) in blocks.iter().rev() {
println!("posting block at slot {slot}");
if let Err(e) = target.post_beacon_blocks(block).await {
if let Error::ServerMessage(ref e) = e {
if e.code == 202 {
println!("duplicate block detected while posting block at slot {slot}");
continue;
}
}
return Err(format!("error posting {slot}: {e:?}"));
} else {
println!("success");
}
}

println!("SYNCED!!!!");

Ok(())
}

async fn get_block_from_source<T: EthSpec>(
source: &BeaconNodeHttpClient,
block_id: BlockId,
spec: &ChainSpec,
cache_dir_path: &Path,
) -> PublishBlockRequest<T> {
let mut cache_path = cache_dir_path.join(format!("block_{block_id}"));

if cache_path.exists() {
let mut f = File::open(&cache_path).unwrap();
let mut bytes = vec![];
f.read_to_end(&mut bytes).unwrap();
PublishBlockRequest::from_ssz_bytes(&bytes, ForkName::Deneb).unwrap()
} else {
let block_from_source = source
.get_beacon_blocks_ssz::<T>(block_id, spec)
.await
.unwrap()
.unwrap();
let blobs_from_source = source
.get_blobs::<T>(block_id, None)
.await
.unwrap()
.unwrap()
.data;

let (kzg_proofs, blobs): (Vec<_>, Vec<_>) = blobs_from_source
.iter()
.cloned()
.map(|sidecar| (sidecar.kzg_proof, sidecar.blob.clone()))
.unzip();

let block_root = block_from_source.canonical_root();
let block_contents = SignedBlockContents {
signed_block: Arc::new(block_from_source),
kzg_proofs: kzg_proofs.into(),
blobs: blobs.into(),
};
let publish_block_req = PublishBlockRequest::BlockContents(block_contents);

cache_path = cache_dir_path.join(format!("block_{block_root:?}"));
let mut f = File::create(&cache_path).unwrap();
f.write_all(&publish_block_req.as_ssz_bytes()).unwrap();

publish_block_req
}
}
74 changes: 74 additions & 0 deletions lcli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod block_root;
mod check_deposit_data;
mod generate_bootnode_enr;
mod http_sync;
mod indexed_attestations;
mod mnemonic_validators;
mod mock_el;
Expand Down Expand Up @@ -552,6 +553,74 @@ fn main() {
.display_order(0)
)
)
.subcommand(
Command::new("http-sync")
.about("Manual sync")
.arg(
Arg::new("start-block")
.long("start-block")
.value_name("BLOCK_ID")
.action(ArgAction::Set)
.help("Block ID of source's head")
.default_value("head")
.required(true)
.display_order(0)
)
.arg(
Arg::new("source-url")
.long("source-url")
.value_name("URL")
.action(ArgAction::Set)
.help("URL to a synced beacon-API provider")
.required(true)
.display_order(0)
)
.arg(
Arg::new("target-url")
.long("target-url")
.value_name("URL")
.action(ArgAction::Set)
.help("URL to an unsynced beacon-API provider")
.required(true)
.display_order(0)
)
.arg(
Arg::new("testnet-dir")
.short('d')
.long("testnet-dir")
.value_name("PATH")
.action(ArgAction::Set)
.global(true)
.help("The testnet dir.")
.display_order(0)
)
.arg(
Arg::new("network")
.long("network")
.value_name("NAME")
.action(ArgAction::Set)
.global(true)
.help("The network to use. Defaults to mainnet.")
.conflicts_with("testnet-dir")
.display_order(0)
)
.arg(
Arg::new("known-common-ancestor")
.long("known-common-ancestor")
.value_name("BLOCK_ID")
.action(ArgAction::Set)
.help("Block ID of common ancestor, if known.")
.display_order(0)
)
.arg(
Arg::new("block-cache-dir")
.long("block-cache-dir")
.value_name("PATH")
.action(ArgAction::Set)
.help("Directory to keep a cache of the downloaded SSZ blocks.")
.display_order(0)
)
)
.get_matches();

let result = matches
Expand Down Expand Up @@ -656,6 +725,11 @@ fn run<E: EthSpec>(env_builder: EnvironmentBuilder<E>, matches: &ArgMatches) ->
}
Some(("mock-el", matches)) => mock_el::run::<E>(env, matches)
.map_err(|e| format!("Failed to run mock-el command: {}", e)),
Some(("http-sync", matches)) => {
let network_config = get_network_config()?;
http_sync::run::<E>(env, network_config, matches)
.map_err(|e| format!("Failed to run http-sync command: {}", e))
}
Some((other, _)) => Err(format!("Unknown subcommand {}. See --help.", other)),
_ => Err("No subcommand provided. See --help.".to_string()),
}
Expand Down
Loading