diff --git a/Cargo.lock b/Cargo.lock index 02846ea23cce..2d86b1a3e08e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15560,6 +15560,7 @@ dependencies = [ "sp-test-primitives", "substrate-prometheus-endpoint", "thiserror", + "tokio", ] [[package]] diff --git a/substrate/client/consensus/common/Cargo.toml b/substrate/client/consensus/common/Cargo.toml index b02bae6659ef..76e183958e9e 100644 --- a/substrate/client/consensus/common/Cargo.toml +++ b/substrate/client/consensus/common/Cargo.toml @@ -35,6 +35,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" } sp-core = { path = "../../../primitives/core" } sp-runtime = { path = "../../../primitives/runtime" } sp-state-machine = { path = "../../../primitives/state-machine" } +tokio = "1.32.0" [dev-dependencies] sp-test-primitives = { path = "../../../primitives/test-primitives" } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 2e8455990a32..079bb1cd0c0c 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -294,7 +294,9 @@ where Block: BlockT, BI: BlockImport, { - match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { + match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None) + .await? + { SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), SingleBlockVerificationOutcome::Verified(import_parameters) => import_single_block_metered(import_handle, import_parameters, None).await, @@ -363,6 +365,7 @@ pub(crate) async fn verify_single_block_metered( block_origin: BlockOrigin, block: IncomingBlock, verifier: &dyn Verifier, + allow_missing_parent: bool, metrics: Option<&Metrics>, ) -> Result, BlockImportError> where @@ -401,7 +404,7 @@ where parent_hash, allow_missing_state: block.allow_missing_state, import_existing: block.import_existing, - allow_missing_parent: block.state.is_some(), + allow_missing_parent: allow_missing_parent || block.state.is_some(), }) .await, )? { diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 87ab26db6c91..a60ce7bc6bfc 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use futures::{ prelude::*, + stream::FuturesOrdered, task::{Context, Poll}, }; use log::{debug, trace}; @@ -27,7 +28,14 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, }; -use std::pin::Pin; +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use tokio::{runtime::Handle, task}; use crate::{ import_queue::{ @@ -223,11 +231,12 @@ mod worker_messages { /// Returns when `block_import` ended. async fn block_import_process( mut block_import: SharedBlockImport, - mut verifier: impl Verifier, + verifier: impl Verifier + 'static, mut result_sender: BufferedLinkSender, mut block_import_receiver: TracingUnboundedReceiver>, metrics: Option, ) { + let verifier: Arc> = Arc::new(verifier); loop { let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await { @@ -241,9 +250,18 @@ async fn block_import_process( }, }; - let res = - import_many_blocks(&mut block_import, origin, blocks, &mut verifier, metrics.clone()) - .await; + let res = if verifier.supports_stateless_verification() { + import_many_blocks_with_stateless_verification( + &mut block_import, + origin, + blocks, + &verifier, + metrics.clone(), + ) + .await + } else { + import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await + }; result_sender.blocks_processed(res.imported, res.block_count, res.results); } @@ -385,11 +403,14 @@ struct ImportManyBlocksResult { /// /// This will yield after each imported block once, to ensure that other futures can /// be called as well. +/// +/// For verifiers that support stateless verification use +/// [`import_many_blocks_with_stateless_verification`] for better performance. async fn import_many_blocks>( import_handle: &mut SharedBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, - verifier: &mut V, + verifier: &V, metrics: Option, ) -> ImportManyBlocksResult { let count = blocks.len(); @@ -431,6 +452,7 @@ async fn import_many_blocks>( blocks_origin, block, verifier, + false, metrics.as_ref(), ); match verification_fut.await { @@ -466,6 +488,114 @@ async fn import_many_blocks>( } } +/// The same as [`import_many_blocks()`]`, but for verifiers that support stateless verification of +/// blocks (use [`Verifier::supports_stateless_verification()`]). +async fn import_many_blocks_with_stateless_verification( + import_handle: &mut SharedBlockImport, + blocks_origin: BlockOrigin, + blocks: Vec>, + verifier: &Arc>, + metrics: Option, +) -> ImportManyBlocksResult { + let count = blocks.len(); + + let blocks_range = match ( + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + + trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range); + + let has_error = Arc::new(AtomicBool::new(false)); + + // Blocks in the response/drain should be in ascending order. + let mut verified_blocks = blocks + .into_iter() + .enumerate() + .map(|(index, block)| { + let import_handle = import_handle.clone(); + let verifier = Arc::clone(verifier); + let metrics = metrics.clone(); + let has_error = Arc::clone(&has_error); + + async move { + let block_number = block.header.as_ref().map(|h| *h.number()); + let block_hash = block.hash; + + let result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) + } else { + task::spawn_blocking(move || { + Handle::current().block_on(verify_single_block_metered( + &import_handle, + blocks_origin, + block, + &verifier, + // Check parent for the first block, but skip for others since blocks + // are verified concurrently before being imported. + index != 0, + metrics.as_ref(), + )) + }) + .await + .unwrap_or_else(|error| { + Err(BlockImportError::Other(sp_consensus::Error::Other( + format!("Failed to join on block verification: {error}").into(), + ))) + }) + }; + + (block_number, block_hash, result) + } + }) + .collect::>(); + + let mut imported = 0; + let mut results = vec![]; + + while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await { + let import_result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) + } else { + // The actual import. + match verification_result { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await, + Err(e) => Err(e), + } + }; + + if let Some(metrics) = metrics.as_ref() { + metrics.report_import::(&import_result); + } + + if import_result.is_ok() { + trace!( + target: LOG_TARGET, + "Block imported successfully {:?} ({})", + block_number, + block_hash, + ); + imported += 1; + } else { + has_error.store(true, Ordering::Release); + } + + results.push((import_result, block_hash)); + + Yield::new().await + } + + // No block left to import, success! + ImportManyBlocksResult { block_count: count, imported, results } +} + /// A future that will always `yield` on the first call of `poll` but schedules the /// current task for re-execution. ///