Skip to content

Commit

Permalink
Support for concurrent verification using stateless verifiers with ab…
Browse files Browse the repository at this point in the history
…ility to control concurrency
  • Loading branch information
nazar-pc committed Sep 30, 2024
1 parent a5e40d0 commit d63b08e
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 41 deletions.
23 changes: 20 additions & 3 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use log::{debug, trace};
use std::{
fmt,
num::NonZeroUsize,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -98,6 +99,19 @@ pub struct IncomingBlock<B: BlockT> {
/// Verify a justification of a block
#[async_trait::async_trait]
pub trait Verifier<B: BlockT>: Send + Sync {
/// How many blocks can be verified concurrently.
///
/// Defaults to 1, which means blocks are verified sequentially, one at a time.
///
/// Value higher than one means verification on blocks can be done in arbitrary order,
/// doesn't expect parent block to be imported first, etc. This significantly improves sync
/// speed by leveraging multiple CPU cores. Good value here is to make concurrency equal to
/// number of CPU cores available. Note that blocks will be verified concurrently, not in
/// parallel, so spawn blocking tasks internally as needed.
fn verification_concurrency(&self) -> NonZeroUsize {
NonZeroUsize::new(1).expect("Not zero; qed")
}

/// Verify the given block data and return the `BlockImportParams` to
/// continue the block import process.
async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
Expand Down Expand Up @@ -227,7 +241,9 @@ pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
block: IncomingBlock<B>,
verifier: &V,
) -> BlockImportResult<B> {
match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None)
.await?
{
SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
SingleBlockVerificationOutcome::Verified(import_parameters) =>
import_single_block_metered(import_handle, import_parameters, None).await,
Expand Down Expand Up @@ -296,6 +312,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: &V,
allow_missing_parent: bool,
metrics: Option<&Metrics>,
) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> {
let peer = block.origin;
Expand Down Expand Up @@ -328,7 +345,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
parent_hash,
allow_missing_state: block.allow_missing_state,
import_existing: block.import_existing,
allow_missing_parent: block.state.is_some(),
allow_missing_parent: allow_missing_parent || block.state.is_some(),
})
.await,
)? {
Expand Down Expand Up @@ -390,7 +407,7 @@ pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
}

pub(crate) async fn import_single_block_metered<Block: BlockT>(
import_handle: &mut impl BlockImport<Block, Error = ConsensusError>,
import_handle: &impl BlockImport<Block, Error = ConsensusError>,
import_parameters: SingleBlockImportParameters<Block>,
metrics: Option<&Metrics>,
) -> BlockImportResult<Block> {
Expand Down
112 changes: 74 additions & 38 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::{
prelude::*,
stream::FuturesOrdered,
task::{Context, Poll},
};
use log::{debug, trace};
Expand All @@ -27,7 +28,11 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
Justification, Justifications,
};
use std::pin::Pin;
use std::{
num::NonZeroUsize,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
};

use crate::{
import_queue::{
Expand Down Expand Up @@ -221,7 +226,7 @@ mod worker_messages {
///
/// Returns when `block_import` ended.
async fn block_import_process<B: BlockT>(
mut block_import: BoxBlockImport<B>,
block_import: BoxBlockImport<B>,
verifier: impl Verifier<B>,
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
Expand All @@ -240,8 +245,15 @@ async fn block_import_process<B: BlockT>(
},
};

let res =
import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await;
let res = import_many_blocks_with_verification_concurrency(
&block_import,
origin,
blocks,
&verifier,
metrics.as_ref(),
verifier.verification_concurrency(),
)
.await;

result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
Expand Down Expand Up @@ -383,12 +395,16 @@ struct ImportManyBlocksResult<B: BlockT> {
///
/// This will yield after each imported block once, to ensure that other futures can
/// be called as well.
async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &mut BoxBlockImport<B>,
///
/// When verification concurrency is set to value higher than 1, block verification will happen in
/// parallel to block import, reducing overall time required.
async fn import_many_blocks_with_verification_concurrency<B: BlockT, V: Verifier<B>>(
import_handle: &BoxBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &V,
metrics: Option<Metrics>,
metrics: Option<&Metrics>,
verification_concurrency: NonZeroUsize,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();

Expand All @@ -403,46 +419,58 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(

trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range);

let mut imported = 0;
let mut results = vec![];
let mut has_error = false;
let mut blocks = blocks.into_iter();
let has_error = &AtomicBool::new(false);

let verify_block_task = |index, block: IncomingBlock<B>| {
async move {
let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;

let result = if has_error.load(Ordering::Acquire) {
Err(BlockImportError::Cancelled)
} else {
verify_single_block_metered(
import_handle,
blocks_origin,
block,
verifier,
// Check parent for the first block, but skip for others since blocks are
// verified concurrently before being imported.
index != 0,
metrics,
)
.await
};

(block_number, block_hash, result)
}
};

// Blocks in the response/drain should be in ascending order.
loop {
// Is there any block left to import?
let block = match blocks.next() {
Some(b) => b,
None => {
// No block left to import, success!
return ImportManyBlocksResult { block_count: count, imported, results }
},
};
let mut blocks_to_verify = blocks.into_iter().enumerate();
let mut verified_blocks = blocks_to_verify
.by_ref()
.take(verification_concurrency.get())
.map(|(index, block)| verify_block_task(index, block))
.collect::<FuturesOrdered<_>>();

let mut imported = 0;
let mut results = vec![];

let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;
let import_result = if has_error {
while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await {
let import_result = if has_error.load(Ordering::Acquire) {
Err(BlockImportError::Cancelled)
} else {
let verification_fut = verify_single_block_metered(
import_handle,
blocks_origin,
block,
verifier,
metrics.as_ref(),
);
match verification_fut.await {
// The actual import.
match verification_result {
Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
// The actual import.
import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
.await
},
Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) =>
import_single_block_metered(import_handle, import_parameters, metrics).await,
Err(e) => Err(e),
}
};

if let Some(metrics) = metrics.as_ref() {
if let Some(metrics) = metrics {
metrics.report_import::<B>(&import_result);
}

Expand All @@ -455,13 +483,21 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
);
imported += 1;
} else {
has_error = true;
has_error.store(true, Ordering::Release);
}

results.push((import_result, block_hash));

// Add more blocks into verification queue if there are any
if let Some((index, block)) = blocks_to_verify.next() {
verified_blocks.push_back(verify_block_task(index, block));
}

Yield::new().await
}

// No block left to import, success!
ImportManyBlocksResult { block_count: count, imported, results }
}

/// A future that will always `yield` on the first call of `poll` but schedules the
Expand Down

0 comments on commit d63b08e

Please sign in to comment.