Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Fix unstable peers and slowness in sync #9967

Merged
merged 5 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,10 @@ impl SyncProvider for EthSync {
}

const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;
const PRIORITY_TIMER: TimerToken = 3;
const MAINTAIN_SYNC_TIMER: TimerToken = 1;
const CONTINUE_SYNC_TIMER: TimerToken = 2;
const TX_TIMER: TimerToken = 3;
const PRIORITY_TIMER: TimerToken = 4;

pub(crate) const PRIORITY_TIMER_INTERVAL: Duration = Duration::from_millis(250);

Expand All @@ -441,7 +442,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(MAINTAIN_SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(CONTINUE_SYNC_TIMER, Duration::from_millis(2500)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");

io.register_timer(PRIORITY_TIMER, PRIORITY_TIMER_INTERVAL).expect("Error registering peers timer");
Expand Down Expand Up @@ -474,7 +476,8 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
MAINTAIN_SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
CONTINUE_SYNC_TIMER => self.sync.write().continue_sync(&mut io),
TX_TIMER => self.sync.write().propagate_new_transactions(&mut io),
PRIORITY_TIMER => self.sync.process_priority_queue(&mut io),
_ => warn!("Unknown timer {} triggered.", timer),
Expand Down
2 changes: 0 additions & 2 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ impl SyncHandler {
sync.sync_peer(io, peer, false);
},
}
// give tasks to other peers
sync.continue_sync(io);
}

/// Called when peer sends us new consensus packet
Expand Down
46 changes: 22 additions & 24 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,37 +866,35 @@ impl ChainSync {
}

/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
// Collect active peers that can sync
let confirmed_peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)|
if peer.can_sync() {
Some((*peer_id, peer.protocol_version))
} else {
None
}
).collect();

trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);

pub fn continue_sync(&mut self, io: &mut SyncIo) {
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();
// Collect active peers that can sync
let mut peers: Vec<(PeerId, u8)> = self.peers.iter().filter_map(|(peer_id, peer)|
if peer.can_sync() && peer.asking == PeerAsking::Nothing && self.active_peers.contains(&peer_id) {
Some((*peer_id, peer.protocol_version))
} else {
None
}
).collect();

random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
if peers.len() > 0 {
trace!(
target: "sync",
"Syncing with peers: {} active, {} available, {} total",
self.active_peers.len(), peers.len(), self.peers.len()
);

for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
random::new().shuffle(&mut peers); // TODO (#646): sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));

for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
}
}
}

Expand Down
8 changes: 5 additions & 3 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,12 @@ impl<C: FlushingBlockChainClient> Peer for EthPeer<C> {
}

fn sync_step(&self) {
let mut io = TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None);
self.chain.flush();
self.sync.write().maintain_peers(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().maintain_sync(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().propagate_new_transactions(&mut TestIo::new(&*self.chain, &self.snapshot_service, &self.queue, None));
self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io);
self.sync.write().continue_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io);
}

fn restart_sync(&self) {
Expand Down