Skip to content

Commit

Permalink
Refactor BoxBlockImport type alias into sharable `SharedBlockImport…
Browse files Browse the repository at this point in the history
…` struct
  • Loading branch information
nazar-pc committed Jan 15, 2024
1 parent 40b0ca8 commit 6fa248c
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 71 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use schnellru::{ByLength, LruMap};

use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams, ForkChoiceStrategy,
BlockImport, BlockImportParams, ForkChoiceStrategy, SharedBlockImport,
};
use sc_consensus_aura::standalone as aura_internal;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
Expand Down Expand Up @@ -246,5 +246,5 @@ where
_phantom: std::marker::PhantomData,
};

BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)
BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry)
}
3 changes: 2 additions & 1 deletion cumulus/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sp_runtime::traits::Block as BlockT;
use sc_consensus::{
block_import::{BlockImport, BlockImportParams},
import_queue::{BasicQueue, Verifier},
SharedBlockImport,
};

use crate::ParachainBlockImportMarker;
Expand Down Expand Up @@ -72,5 +73,5 @@ where
+ Sync
+ 'static,
{
BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry)
BasicQueue::new(VerifyNothing, SharedBlockImport::new(block_import), None, spawner, registry)
}
4 changes: 2 additions & 2 deletions cumulus/client/consensus/relay-chain/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use cumulus_client_consensus_common::ParachainBlockImportMarker;

use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams,
BlockImport, BlockImportParams, SharedBlockImport,
};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
Expand Down Expand Up @@ -128,5 +128,5 @@ where
{
let verifier = Verifier::new(client, create_inherent_data_providers);

Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry))
Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry))
}
4 changes: 2 additions & 2 deletions cumulus/polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier;
use futures::{lock::Mutex, prelude::*};
use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImportParams, ImportQueue,
BlockImportParams, ImportQueue, SharedBlockImport,
};
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
use sc_network::{config::FullNetworkConfiguration, NetworkBlock};
Expand Down Expand Up @@ -1288,7 +1288,7 @@ where
let registry = config.prometheus_registry();
let spawner = task_manager.spawn_essential_handle();

Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry))
Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, &spawner, registry))
}

/// Start an aura powered parachain node. Asset Hub and Collectives use this.
Expand Down
3 changes: 2 additions & 1 deletion substrate/bin/minimal/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use futures::FutureExt;
use runtime::{self, interface::OpaqueBlock as Block, RuntimeApi};
use sc_client_api::backend::Backend;
use sc_consensus::SharedBlockImport;
use sc_executor::WasmExecutor;
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
Expand Down Expand Up @@ -86,7 +87,7 @@ pub fn new_partial(config: &Configuration) -> Result<Service, ServiceError> {
);

let import_queue = sc_consensus_manual_seal::import_queue(
Box::new(client.clone()),
SharedBlockImport::new(client.clone()),
&task_manager.spawn_essential_handle(),
config.prometheus_registry(),
);
Expand Down
9 changes: 8 additions & 1 deletion substrate/client/consensus/aura/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider};
use sc_consensus::{
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
SharedBlockImport,
};
use sc_consensus_slots::{check_equivocation, CheckedHeader, InherentDataProviderExt};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
Expand Down Expand Up @@ -376,7 +377,13 @@ where
compatibility_mode,
});

Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry))
Ok(BasicQueue::new(
verifier,
SharedBlockImport::new(block_import),
justification_import,
spawner,
registry,
))
}

/// Parameters of [`build_verifier`].
Expand Down
9 changes: 8 additions & 1 deletion substrate/client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ use sc_consensus::{
StateAction,
},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
SharedBlockImport,
};
use sc_consensus_epochs::{
descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor,
Expand Down Expand Up @@ -1853,7 +1854,13 @@ where
spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());

Ok((
BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
BasicQueue::new(
verifier,
SharedBlockImport::new(block_import),
justification_import,
spawner,
registry,
),
BabeWorkerHandle(worker_tx),
))
}
Expand Down
29 changes: 14 additions & 15 deletions substrate/client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::*;
use authorship::claim_slot;
use sc_block_builder::{BlockBuilder, BlockBuilderBuilder};
use sc_client_api::{BlockchainEvents, Finalizer};
use sc_consensus::{BoxBlockImport, BoxJustificationImport};
use sc_consensus::{BoxJustificationImport, SharedBlockImport};
use sc_consensus_epochs::{EpochIdentifier, EpochIdentifierPosition};
use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging;
use sc_network_test::{Block as TestBlock, *};
Expand Down Expand Up @@ -209,7 +209,7 @@ impl Verifier<TestBlock> for TestVerifier {

pub struct PeerData {
link: BabeLink<TestBlock>,
block_import: Mutex<Option<BoxBlockImport<TestBlock>>>,
block_import: SharedBlockImport<TestBlock>,
}

impl TestNetFactory for BabeTestNet {
Expand All @@ -233,8 +233,7 @@ impl TestNetFactory for BabeTestNet {

let block_import = PanickingBlockImport(block_import);

let data_block_import =
Mutex::new(Some(Box::new(block_import.clone()) as BoxBlockImport<_>));
let data_block_import = SharedBlockImport::new(block_import.clone());
(
BlockImportAdapter::new(block_import),
None,
Expand Down Expand Up @@ -375,7 +374,7 @@ async fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + '
let client_clone = client.clone();
babe_futures.push(
start_babe(BabeParams {
block_import: data.block_import.lock().take().expect("import set up during init"),
block_import: data.block_import.clone(),
select_chain,
client,
env: environ,
Expand Down Expand Up @@ -621,7 +620,7 @@ async fn propose_and_import_block(
parent: &TestHeader,
slot: Option<Slot>,
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock>,
block_import: &mut SharedBlockImport<TestBlock>,
) -> Hash {
let mut proposer = proposer_factory.init(parent).await.unwrap();

Expand Down Expand Up @@ -691,7 +690,7 @@ async fn propose_and_import_block(
async fn propose_and_import_blocks(
client: &PeersFullClient,
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock>,
block_import: &mut SharedBlockImport<TestBlock>,
parent_hash: Hash,
n: usize,
) -> Vec<Hash> {
Expand Down Expand Up @@ -722,7 +721,7 @@ async fn importing_block_one_sets_genesis_epoch() {
mutator: Arc::new(|_, _| ()),
};

let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();

let genesis_header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();

Expand Down Expand Up @@ -756,7 +755,7 @@ async fn revert_prunes_epoch_changes_and_removes_weights() {

let client = peer.client().as_client();
let backend = peer.client().as_backend();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();
let epoch_changes = data.link.epoch_changes.clone();

let mut proposer_factory = DummyFactory {
Expand Down Expand Up @@ -844,7 +843,7 @@ async fn revert_not_allowed_for_finalized() {

let client = peer.client().as_client();
let backend = peer.client().as_backend();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();

let mut proposer_factory = DummyFactory {
client: client.clone(),
Expand Down Expand Up @@ -883,7 +882,7 @@ async fn importing_epoch_change_block_prunes_tree() {
let data = peer.data.as_ref().expect("babe link set up during initialization");

let client = peer.client().as_client();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();
let epoch_changes = data.link.epoch_changes.clone();

let mut proposer_factory = DummyFactory {
Expand Down Expand Up @@ -982,7 +981,7 @@ async fn verify_slots_are_strictly_increasing() {
let data = peer.data.as_ref().expect("babe link set up during initialization");

let client = peer.client().as_client();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();

let mut proposer_factory = DummyFactory {
client: client.clone(),
Expand Down Expand Up @@ -1029,7 +1028,7 @@ async fn obsolete_blocks_aux_data_cleanup() {
mutator: Arc::new(|_, _| ()),
};

let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();

let aux_data_check = |hashes: &[Hash], expected: bool| {
hashes.iter().all(|hash| {
Expand Down Expand Up @@ -1106,7 +1105,7 @@ async fn allows_skipping_epochs() {
let data = peer.data.as_ref().expect("babe link set up during initialization");

let client = peer.client().as_client();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();

let mut proposer_factory = DummyFactory {
client: client.clone(),
Expand Down Expand Up @@ -1235,7 +1234,7 @@ async fn allows_skipping_epochs_on_some_forks() {
let data = peer.data.as_ref().expect("babe link set up during initialization");

let client = peer.client().as_client();
let mut block_import = data.block_import.lock().take().expect("import set up during init");
let mut block_import = data.block_import.clone();

let mut proposer_factory = DummyFactory {
client: client.clone(),
Expand Down
1 change: 1 addition & 0 deletions substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
async-lock = "2.8.0"
async-trait = "0.1.74"
futures = { version = "0.3.21", features = ["thread-pool"] }
futures-timer = "3.0.1"
Expand Down
6 changes: 3 additions & 3 deletions substrate/client/consensus/common/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,20 @@ pub trait BlockImport<B: BlockT> {
}

#[async_trait::async_trait]
impl<B: BlockT> BlockImport<B> for crate::import_queue::BoxBlockImport<B> {
impl<B: BlockT> BlockImport<B> for crate::import_queue::SharedBlockImport<B> {
type Error = sp_consensus::error::Error;

/// Check block preconditions.
async fn check_block(&self, block: BlockCheckParams<B>) -> Result<ImportResult, Self::Error> {
(**self).check_block(block).await
self.read().await.check_block(block).await
}

/// Import a block.
async fn import_block(
&mut self,
block: BlockImportParams<B>,
) -> Result<ImportResult, Self::Error> {
(**self).import_block(block).await
self.write().await.import_block(block).await
}
}

Expand Down
Loading

0 comments on commit 6fa248c

Please sign in to comment.