Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Remove blocking based on PR comments and create new WarpSync on poll
Browse files Browse the repository at this point in the history
  • Loading branch information
samelamin committed Nov 28, 2022
1 parent 53e53db commit 0c67920
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
10 changes: 10 additions & 0 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,16 @@ pub trait ChainSync<Block: BlockT>: Send {
cx: &mut std::task::Context<'a>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;

/// Poll warp sync target block
///
/// This should be polled until it returns [`target_block`].
///
/// If [`target_block`] is returned, then `WarpSync::new` is called with a target header
fn poll_warp_sync_target_block<'a>(
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<Block::Header>;

/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
Expand Down
27 changes: 17 additions & 10 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,16 +610,7 @@ where
warp_with_provider.clone(),
));
},
Some(WarpSyncParams::WaitForTarget(target_block)) => {
log::debug!(target: "sync", "Waiting for target block.");
futures::executor::block_on(async {
self.warp_sync = Some(WarpSync::new_with_target_block(
self.client.clone(),
target_block.await.unwrap(),
));
});
},
None => {},
_ => {},
}
}
}
Expand Down Expand Up @@ -1345,6 +1336,16 @@ where
}
}

fn poll_warp_sync_target_block(&mut self, cx: &mut std::task::Context) -> Poll<B::Header> {
if let Some(WarpSyncParams::WaitForTarget(target_block)) = self.warp_sync_params.as_mut() {
return match target_block.poll_unpin(cx) {
Poll::Ready(Ok(target_block)) => Poll::Ready(target_block),
_ => Poll::Pending,
}
}
Poll::Pending
}

fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> {
self.blocks.clear_peer_download(who);
if let Some(gap_sync) = &mut self.gap_sync {
Expand Down Expand Up @@ -1444,6 +1445,12 @@ where
}
}
self.process_outbound_requests();
match self.poll_warp_sync_target_block(cx) {
Poll::Ready(target_block) =>
self.warp_sync =
Some(WarpSync::new_with_target_block(self.client.clone(), target_block)),
Poll::Pending => (),
};

if let Poll::Ready(result) = self.poll_pending_responses(cx) {
return Poll::Ready(PollResult::Import(result))
Expand Down
4 changes: 4 additions & 0 deletions client/network/sync/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ mockall::mock! {
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn poll_warp_sync_target_block<'a>(
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<Block::Header>;
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>;
fn metrics(&self) -> Metrics;
fn block_response_into_blocks(
Expand Down

0 comments on commit 0c67920

Please sign in to comment.