Skip to content

Commit

Permalink
Resumable warp-sync / Seed downloaded snapshots (openethereum#8544)
Browse files Browse the repository at this point in the history
* Start dividing sync chain : first supplier method

* WIP - updated chain sync supplier

* Finish refactoring the Chain Sync Supplier

* Create Chain Sync Requester

* Add Propagator for Chain Sync

* Add the Chain Sync Handler

* Move tests from mod -> handler

* Move tests to propagator

* Refactor SyncRequester arguments

* Refactoring peer fork header handler

* Fix wrong highest block number in snapshot sync

* Small refactor...

* Resume warp-sync downloaded chunks

* Add comments

* Refactoring the previous chunks import

* Fix tests

* Address PR grumbles

* Fix not seeding current snapshot

* Address PR Grumbles

* Address PR grumble

* Retry failed CI job

* Update SnapshotService readiness check
Fix restoration locking issue for previous chunks restoration

* Fix tests

* Fix tests

* Fix test

* Early abort importing previous chunks

* PR Grumbles

* Update Gitlab CI config

* SyncState back to Waiting when Manifest peers disconnect

* Move fix

* Better fix

* Revert GitLab CI changes

* Fix Warning

* Refactor resuming snapshots

* Fix string construction

* Revert "Refactor resuming snapshots"

This reverts commit 75fd4b5.

* Update informant log

* Fix string construction

* Refactor resuming snapshots

* Fix informant

* PR Grumbles

* Update informant message : show chunks done

* PR Grumbles

* Fix

* Fix Warning

* PR Grumbles
  • Loading branch information
ngotchac authored and VladLupashevskyi committed May 23, 2018
1 parent d4683d1 commit e9c602b
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 102 deletions.
165 changes: 146 additions & 19 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! Snapshot network service implementation.

use std::collections::HashSet;
use std::io::ErrorKind;
use std::fs;
use std::io::{self, Read, ErrorKind};
use std::fs::{self, File};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
Expand All @@ -30,6 +30,7 @@ use blockchain::BlockChain;
use client::{Client, ChainInfo, ClientIoMessage};
use engines::EthEngine;
use error::Error;
use hash::keccak;
use ids::BlockId;

use io::IoChannel;
Expand Down Expand Up @@ -270,8 +271,8 @@ impl Service {
}
}

// delete the temporary restoration dir if it does exist.
if let Err(e) = fs::remove_dir_all(service.restoration_dir()) {
// delete the temporary restoration DB dir if it does exist.
if let Err(e) = fs::remove_dir_all(service.restoration_db()) {
if e.kind() != ErrorKind::NotFound {
return Err(e.into())
}
Expand Down Expand Up @@ -325,6 +326,13 @@ impl Service {
dir
}

// previous snapshot chunks path.
fn prev_chunks_dir(&self) -> PathBuf {
let mut dir = self.snapshot_root.clone();
dir.push("prev_chunks");
dir
}

// replace one the client's database with our own.
fn replace_client_db(&self) -> Result<(), Error> {
let our_db = self.restoration_db();
Expand Down Expand Up @@ -406,9 +414,26 @@ impl Service {
/// Initialize the restoration synchronously.
/// The recover flag indicates whether to recover the restored snapshot.
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> {
let mut res = self.restoration.lock();

let rest_dir = self.restoration_dir();
let rest_db = self.restoration_db();
let recovery_temp = self.temp_recovery_dir();
let prev_chunks = self.prev_chunks_dir();

let mut res = self.restoration.lock();
// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&prev_chunks) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}

// Move the previous recovery temp directory
// to `prev_chunks` to be able to restart restoring
// with previously downloaded blocks
// This step is optional, so don't fail on error
fs::rename(&recovery_temp, &prev_chunks).ok();

self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);
Expand All @@ -424,40 +449,104 @@ impl Service {
}
}

*self.status.lock() = RestorationStatus::Initializing {
chunks_done: 0,
};

fs::create_dir_all(&rest_dir)?;

// make new restoration.
let writer = match recover {
true => Some(LooseWriter::new(self.temp_recovery_dir())?),
true => Some(LooseWriter::new(recovery_temp)?),
false => None
};

let params = RestorationParams {
manifest: manifest,
manifest: manifest.clone(),
pruning: self.pruning,
db: self.restoration_db_handler.open(&self.restoration_db())?,
db: self.restoration_db_handler.open(&rest_db)?,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_dir),
guard: Guard::new(rest_db),
engine: &*self.engine,
};

let state_chunks = params.manifest.state_hashes.len();
let block_chunks = params.manifest.block_hashes.len();
let state_chunks = manifest.state_hashes.len();
let block_chunks = manifest.block_hashes.len();

*res = Some(Restoration::new(params)?);

self.restoring_snapshot.store(true, Ordering::SeqCst);

// Import previous chunks, continue if it fails
self.import_prev_chunks(&mut res, manifest).ok();

*self.status.lock() = RestorationStatus::Ongoing {
state_chunks: state_chunks as u32,
block_chunks: block_chunks as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
};

self.restoring_snapshot.store(true, Ordering::SeqCst);
Ok(())
}

/// Import the previous chunks into the current restoration
fn import_prev_chunks(&self, restoration: &mut Option<Restoration>, manifest: ManifestData) -> Result<(), Error> {
let prev_chunks = self.prev_chunks_dir();

// Restore previous snapshot chunks
let files = fs::read_dir(prev_chunks.as_path())?;
let mut num_temp_chunks = 0;

for prev_chunk_file in files {
// Don't go over all the files if the restoration has been aborted
if !self.restoring_snapshot.load(Ordering::SeqCst) {
trace!(target:"snapshot", "Aborting importing previous chunks");
return Ok(());
}
// Import the chunk, don't fail and continue if one fails
match self.import_prev_chunk(restoration, &manifest, prev_chunk_file) {
Ok(true) => num_temp_chunks += 1,
Err(e) => trace!(target: "snapshot", "Error importing chunk: {:?}", e),
_ => (),
}
}

trace!(target:"snapshot", "Imported {} previous chunks", num_temp_chunks);

// Remove the prev temp directory
fs::remove_dir_all(&prev_chunks)?;

Ok(())
}

/// Import a previous chunk at the given path. Returns whether the block was imported or not
fn import_prev_chunk(&self, restoration: &mut Option<Restoration>, manifest: &ManifestData, file: io::Result<fs::DirEntry>) -> Result<bool, Error> {
let file = file?;
let path = file.path();

let mut file = File::open(path.clone())?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;

let hash = keccak(&buffer);

let is_state = if manifest.block_hashes.contains(&hash) {
false
} else if manifest.state_hashes.contains(&hash) {
true
} else {
return Ok(false);
};

self.feed_chunk_with_restoration(restoration, hash, &buffer, is_state)?;

trace!(target: "snapshot", "Fed chunk {:?}", hash);

Ok(true)
}

// finalize the restoration. this accepts an already-locked
// restoration as an argument -- so acquiring it again _will_
// lead to deadlock.
Expand Down Expand Up @@ -499,12 +588,19 @@ impl Service {
/// Feed a chunk of either kind. no-op if no restoration or status is wrong.
fn feed_chunk(&self, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
// TODO: be able to process block chunks and state chunks at same time?
let (result, db) = {
let mut restoration = self.restoration.lock();
let mut restoration = self.restoration.lock();
self.feed_chunk_with_restoration(&mut restoration, hash, chunk, is_state)
}

/// Feed a chunk with the Restoration
fn feed_chunk_with_restoration(&self, restoration: &mut Option<Restoration>, hash: H256, chunk: &[u8], is_state: bool) -> Result<(), Error> {
let (result, db) = {
match self.status() {
RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()),
RestorationStatus::Ongoing { .. } => {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash);
return Ok(());
},
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing { .. } => {
let (res, db) = {
let rest = match *restoration {
Some(ref mut r) => r,
Expand Down Expand Up @@ -583,11 +679,41 @@ impl SnapshotService for Service {
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok())
}

fn completed_chunks(&self) -> Option<Vec<H256>> {
let restoration = self.restoration.lock();

match *restoration {
Some(ref restoration) => {
let completed_chunks = restoration.manifest.block_hashes
.iter()
.filter(|h| !restoration.block_chunks_left.contains(h))
.chain(
restoration.manifest.state_hashes
.iter()
.filter(|h| !restoration.state_chunks_left.contains(h))
)
.map(|h| *h)
.collect();

Some(completed_chunks)
},
None => None,
}
}

fn status(&self) -> RestorationStatus {
let mut cur_status = self.status.lock();
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status {
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32;
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32;

match *cur_status {
RestorationStatus::Initializing { ref mut chunks_done } => {
*chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32 +
self.block_chunks.load(Ordering::SeqCst) as u32;
}
RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } => {
*state_chunks_done = self.state_chunks.load(Ordering::SeqCst) as u32;
*block_chunks_done = self.block_chunks.load(Ordering::SeqCst) as u32;
},
_ => (),
}

cur_status.clone()
Expand All @@ -600,6 +726,7 @@ impl SnapshotService for Service {
}

fn abort_restore(&self) {
trace!(target: "snapshot", "Aborting restore");
self.restoring_snapshot.store(false, Ordering::SeqCst);
*self.restoration.lock() = None;
*self.status.lock() = RestorationStatus::Inactive;
Expand Down
8 changes: 6 additions & 2 deletions ethcore/src/snapshot/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,16 @@ fn guards_delete_folders() {
service.init_restore(manifest.clone(), true).unwrap();
assert!(path.exists());

// The `db` folder should have been deleted,
// while the `temp` one kept
service.abort_restore();
assert!(!path.exists());
assert!(!path.join("db").exists());
assert!(path.join("temp").exists());

service.init_restore(manifest.clone(), true).unwrap();
assert!(path.exists());

drop(service);
assert!(!path.exists());
assert!(!path.join("db").exists());
assert!(path.join("temp").exists());
}
3 changes: 3 additions & 0 deletions ethcore/src/snapshot/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub trait SnapshotService : Sync + Send {
/// `None` indicates warp sync isn't supported by the consensus engine.
fn supported_versions(&self) -> Option<(u64, u64)>;

/// Returns a list of the completed chunks
fn completed_chunks(&self) -> Option<Vec<H256>>;

/// Get raw chunk for a given hash.
fn chunk(&self, hash: H256) -> Option<Bytes>;

Expand Down
52 changes: 39 additions & 13 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,27 @@ impl SyncHandler {
}

/// Called by peer when it is disconnecting
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
sync.handshaking_peers.remove(&peer);
if sync.peers.contains_key(&peer) {
debug!(target: "sync", "Disconnected {}", peer);
sync.clear_peer_download(peer);
sync.peers.remove(&peer);
sync.active_peers.remove(&peer);
pub fn on_peer_aborting(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "== Disconnecting {}: {}", peer_id, io.peer_info(peer_id));
sync.handshaking_peers.remove(&peer_id);
if sync.peers.contains_key(&peer_id) {
debug!(target: "sync", "Disconnected {}", peer_id);
sync.clear_peer_download(peer_id);
sync.peers.remove(&peer_id);
sync.active_peers.remove(&peer_id);

if sync.state == SyncState::SnapshotManifest {
// Check if we are asking other peers for
// the snapshot manifest as well.
// If not, return to initial state
let still_asking_manifest = sync.peers.iter()
.filter(|&(id, p)| sync.active_peers.contains(id) && p.asking == PeerAsking::SnapshotManifest)
.next().is_none();

if still_asking_manifest {
sync.state = ChainSync::get_init_state(sync.warp_sync, io.chain());
}
}
sync.continue_sync(io);
}
}
Expand Down Expand Up @@ -320,6 +333,10 @@ impl SyncHandler {
}

fn on_peer_confirmed(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
{
let peer = sync.peers.get_mut(&peer_id).expect("Is only called when peer is present in peers");
peer.confirmation = ForkConfirmation::Confirmed;
}
sync.sync_peer(io, peer_id, false);
}

Expand All @@ -344,8 +361,8 @@ impl SyncHandler {
}

trace!(target: "sync", "{}: Confirmed peer", peer_id);
peer.confirmation = ForkConfirmation::Confirmed;
if !io.chain_overlay().read().contains_key(&fork_number) {
trace!(target: "sync", "Inserting (fork) block {} header", fork_number);
io.chain_overlay().write().insert(fork_number, header.to_vec());
}
}
Expand Down Expand Up @@ -560,6 +577,10 @@ impl SyncHandler {
sync.continue_sync(io);
return Ok(());
},
RestorationStatus::Initializing { .. } => {
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
return Ok(());
}
RestorationStatus::Ongoing { .. } => {
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
},
Expand Down Expand Up @@ -659,11 +680,16 @@ impl SyncHandler {
// Let the current sync round complete first.
sync.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
if let Some((fork_block, _)) = sync.fork_block {
SyncRequester::request_fork_header(sync, io, peer_id, fork_block);
} else {
SyncHandler::on_peer_confirmed(sync, io, peer_id);

match sync.fork_block {
Some((fork_block, _)) => {
SyncRequester::request_fork_header(sync, io, peer_id, fork_block);
},
_ => {
SyncHandler::on_peer_confirmed(sync, io, peer_id);
}
}

Ok(())
}

Expand Down
Loading

0 comments on commit e9c602b

Please sign in to comment.