Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Move import queue out of sc-network #12764

Merged
merged 9 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ futures = { version = "0.3.21", features = ["thread-pool"] }
futures-timer = "3.0.1"
libp2p = { version = "0.49.0", default-features = false }
log = "0.4.17"
mockall = "0.11.2"
parking_lot = "0.12.1"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0.30"
Expand Down
23 changes: 19 additions & 4 deletions client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub type DefaultImportQueue<Block, Client> =

mod basic_queue;
pub mod buffered_link;
pub mod mock;

/// Shared block import struct used by the queue.
pub type BoxBlockImport<B, Transaction> =
Expand Down Expand Up @@ -105,10 +106,10 @@ pub trait Verifier<B: BlockT>: Send + Sync {
/// Blocks import queue API.
///
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
/// Afterwards, call `poll_actions` to determine how to respond to these elements.
pub trait ImportQueue<B: BlockT>: Send {
pub trait ImportQueueService<B: BlockT>: Send {
/// Import bunch of blocks.
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);

/// Import block justifications.
fn import_justifications(
&mut self,
Expand All @@ -117,12 +118,26 @@ pub trait ImportQueue<B: BlockT>: Send {
number: NumberFor<B>,
justifications: Justifications,
);
/// Polls for actions to perform on the network.
///
}

#[async_trait::async_trait]
pub trait ImportQueue<B: BlockT>: Send {
/// Get a copy of the handle to [`ImportQueueService`].
fn service(&self) -> Box<dyn ImportQueueService<B>>;

/// Get a reference to the handle to [`ImportQueueService`].
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;

/// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`.
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influece the synchronization process.
async fn run(self, link: Box<dyn Link<B>>);
}

/// Hooks that the verification queue can use to influence the synchronization
Expand Down
69 changes: 60 additions & 9 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,17 @@ use crate::{
import_queue::{
buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport,
BoxJustificationImport, ImportQueue, IncomingBlock, Link, RuntimeOrigin, Verifier,
BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link,
RuntimeOrigin, Verifier,
},
metrics::Metrics,
};

/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// task, with plugable verification.
pub struct BasicQueue<B: BlockT, Transaction> {
/// Channel to send justification import messages to the background task.
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
/// Handle for sending justification and block import messages to the background task.
handle: BasicQueueHandle<B>,
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
_phantom: PhantomData<Transaction>,
Expand All @@ -54,8 +53,7 @@ pub struct BasicQueue<B: BlockT, Transaction> {
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
// Flush the queue and close the receiver to terminate the future.
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
self.handle.close();
self.result_port.close();
}
}
Expand Down Expand Up @@ -95,11 +93,37 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
future.boxed(),
);

Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData }
Self {
handle: BasicQueueHandle::new(justification_sender, block_import_sender),
result_port,
_phantom: PhantomData,
}
}
}

impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
#[derive(Clone)]
struct BasicQueueHandle<B: BlockT> {
/// Channel to send justification import messages to the background task.
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
}

impl<B: BlockT> BasicQueueHandle<B> {
pub fn new(
justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) -> Self {
Self { justification_sender, block_import_sender }
}

pub fn close(&mut self) {
self.justification_sender.close_channel();
self.block_import_sender.close_channel();
}
}

impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return
Expand Down Expand Up @@ -138,12 +162,39 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}
}
}
}

#[async_trait::async_trait]
impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction> {
/// Get handle to [`ImportQueueService`].
fn service(&self) -> Box<dyn ImportQueueService<B>> {
Box::new(self.handle.clone())
}

/// Get a reference to the handle to [`ImportQueueService`].
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
&mut self.handle
}

/// Poll actions from network.
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
}
}

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influece the synchronization process.
async fn run(mut self, mut link: Box<dyn Link<B>>) {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
}
}
}
}

/// Messages destinated to the background worker.
Expand Down
32 changes: 23 additions & 9 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<B: BlockT> Clone for BufferedLinkSender<B> {
}

/// Internal buffered message.
enum BlockImportWorkerMsg<B: BlockT> {
pub enum BlockImportWorkerMsg<B: BlockT> {
BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, bool),
RequestJustification(B::Hash, NumberFor<B>),
Expand Down Expand Up @@ -122,6 +122,18 @@ pub struct BufferedLinkReceiver<B: BlockT> {
}

impl<B: BlockT> BufferedLinkReceiver<B> {
/// Send action for the synchronization to perform.
pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
}
}

/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
/// passed as parameter.
///
Expand All @@ -138,15 +150,17 @@ impl<B: BlockT> BufferedLinkReceiver<B> {
Poll::Pending => break Ok(()),
};

match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
link.blocks_processed(imported, count, results),
BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
link.justification_imported(who, &hash, number, success),
BlockImportWorkerMsg::RequestJustification(hash, number) =>
link.request_justification(&hash, number),
}
self.send_actions(msg, &mut *link);
}
}

/// Poll next element from import queue and send the corresponding action command over the link.
pub async fn next_action(&mut self, link: &mut dyn Link<B>) -> Result<(), ()> {
if let Some(msg) = self.rx.next().await {
self.send_actions(msg, link);
return Ok(())
}
Err(())
}

/// Close the channel.
Expand Down
46 changes: 46 additions & 0 deletions client/consensus/common/src/import_queue/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This file is part of Substrate.

// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::*;

mockall::mock! {
pub ImportQueueHandle<B: BlockT> {}

impl<B: BlockT> ImportQueueService<B> for ImportQueueHandle<B> {
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
fn import_justifications(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
);
}
}

mockall::mock! {
pub ImportQueue<B: BlockT> {}

#[async_trait::async_trait]
impl<B: BlockT> ImportQueue<B> for ImportQueue<B> {
fn service(&self) -> Box<dyn ImportQueueService<B>>;
fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
fn poll_actions<'a>(&mut self, cx: &mut futures::task::Context<'a>, link: &mut dyn Link<B>);
async fn run(self, link: Box<dyn Link<B>>);
}
}
28 changes: 12 additions & 16 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ pub mod warp;

use libp2p::PeerId;
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use sc_consensus::{
import_queue::RuntimeOrigin, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sc_consensus::{import_queue::RuntimeOrigin, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
Expand Down Expand Up @@ -317,6 +315,12 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;

/// Procss received block data.
fn process_block_response_data(
&mut self,
blocks_to_import: Result<OnBlockData<Block>, BadPeer>,
);

/// Handle a response from the remote to a justification request that we made.
///
/// `request` must be the original request that triggered `response`.
Expand All @@ -326,17 +330,6 @@ pub trait ChainSync<Block: BlockT>: Send {
response: BlockResponse<Block>,
) -> Result<OnBlockJustification<Block>, BadPeer>;

/// A batch of blocks have been processed, with or without errors.
///
/// Call this when a batch of blocks have been processed by the import
/// queue, with or without errors.
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>, Block::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<Block>), BadPeer>>>;

/// Call this when a justification has been processed by the import queue,
/// with or without errors.
fn on_justification_import(
Expand Down Expand Up @@ -378,7 +371,7 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>;
fn peer_disconnected(&mut self, who: &PeerId);

/// Return some key metrics.
fn metrics(&self) -> Metrics;
Expand All @@ -395,7 +388,10 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Internally calls [`ChainSync::poll_block_announce_validation()`] and
/// this function should be polled until it returns [`Poll::Pending`] to
/// consume all pending events.
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<Block>>;
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;

/// Send block request to peer
fn send_block_request(&mut self, who: PeerId, request: BlockRequest<Block>);
Expand Down
Loading