Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

make SelectChain async #9128

Merged
2 commits merged into from
Jun 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 30 additions & 21 deletions client/consensus/babe/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,14 @@ impl<B: BlockT, C, SC> BabeRpcHandler<B, C, SC> {
}

impl<B, C, SC> BabeApi for BabeRpcHandler<B, C, SC>
where
B: BlockT,
C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error=BlockChainError> + 'static,
C::Api: BabeRuntimeApi<B>,
SC: SelectChain<B> + Clone + 'static,
where
B: BlockT,
C: ProvideRuntimeApi<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = BlockChainError>
+ 'static,
C::Api: BabeRuntimeApi<B>,
SC: SelectChain<B> + Clone + 'static,
{
fn epoch_authorship(&self) -> FutureResult<HashMap<AuthorityId, EpochAuthorship>> {
if let Err(err) = self.deny_unsafe.check_if_safe() {
Expand All @@ -118,28 +121,33 @@ impl<B, C, SC> BabeApi for BabeRpcHandler<B, C, SC>
self.select_chain.clone(),
);
let future = async move {
let header = select_chain.best_chain().map_err(Error::Consensus)?;
let epoch_start = client.runtime_api()
let header = select_chain.best_chain().map_err(Error::Consensus).await?;
let epoch_start = client
.runtime_api()
.current_epoch_start(&BlockId::Hash(header.hash()))
.map_err(|err| {
Error::StringError(format!("{:?}", err))
})?;
.map_err(|err| Error::StringError(format!("{:?}", err)))?;
let epoch = epoch_data(
&shared_epoch,
&client,
&babe_config,
*epoch_start,
&select_chain,
)?;
)
.await?;
let (epoch_start, epoch_end) = (epoch.start_slot(), epoch.end_slot());

let mut claims: HashMap<AuthorityId, EpochAuthorship> = HashMap::new();

let keys = {
epoch.authorities.iter()
epoch
.authorities
.iter()
.enumerate()
.filter_map(|(i, a)| {
if SyncCryptoStore::has_keys(&*keystore, &[(a.0.to_raw_vec(), AuthorityId::ID)]) {
if SyncCryptoStore::has_keys(
&*keystore,
&[(a.0.to_raw_vec(), AuthorityId::ID)],
) {
Some((a.0.clone(), i))
} else {
None
Expand Down Expand Up @@ -167,7 +175,8 @@ impl<B, C, SC> BabeApi for BabeRpcHandler<B, C, SC>
}

Ok(claims)
}.boxed();
}
.boxed();

Box::new(future.compat())
}
Expand Down Expand Up @@ -203,20 +212,20 @@ impl From<Error> for jsonrpc_core::Error {
}
}

/// fetches the epoch data for a given slot.
fn epoch_data<B, C, SC>(
/// Fetches the epoch data for a given slot.
async fn epoch_data<B, C, SC>(
epoch_changes: &SharedEpochChanges<B, Epoch>,
client: &Arc<C>,
babe_config: &Config,
slot: u64,
select_chain: &SC,
) -> Result<Epoch, Error>
where
B: BlockT,
C: HeaderBackend<B> + HeaderMetadata<B, Error=BlockChainError> + 'static,
SC: SelectChain<B>,
where
B: BlockT,
C: HeaderBackend<B> + HeaderMetadata<B, Error = BlockChainError> + 'static,
SC: SelectChain<B>,
{
let parent = select_chain.best_chain()?;
let parent = select_chain.best_chain().await?;
epoch_changes.shared_data().epoch_data_for_child_of(
descendent_query(&**client),
&parent.hash(),
Expand Down
24 changes: 19 additions & 5 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ where
Ok(())
}

fn check_and_report_equivocation(
async fn check_and_report_equivocation(
&self,
slot_now: Slot,
slot: Slot,
Expand Down Expand Up @@ -1039,6 +1039,7 @@ where
let best_id = self
.select_chain
.best_chain()
.await
.map(|h| BlockId::Hash(h.hash()))
.map_err(|e| Error::Client(e.into()))?;

Expand Down Expand Up @@ -1085,13 +1086,26 @@ where
}
}

type BlockVerificationResult<Block> = Result<
(
BlockImportParams<Block, ()>,
Option<Vec<(CacheKeyId, Vec<u8>)>>,
),
String,
>;

#[async_trait::async_trait]
impl<Block, Client, SelectChain, CAW, CIDP> Verifier<Block>
for BabeVerifier<Block, Client, SelectChain, CAW, CIDP>
where
Block: BlockT,
Client: HeaderMetadata<Block, Error = sp_blockchain::Error> + HeaderBackend<Block> + ProvideRuntimeApi<Block>
+ Send + Sync + AuxStore + ProvideCache<Block>,
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
+ HeaderBackend<Block>
+ ProvideRuntimeApi<Block>
+ Send
+ Sync
+ AuxStore
+ ProvideCache<Block>,
Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
SelectChain: sp_consensus::SelectChain<Block>,
CAW: CanAuthorWith<Block> + Send + Sync,
Expand All @@ -1104,7 +1118,7 @@ where
header: Block::Header,
justifications: Option<Justifications>,
mut body: Option<Vec<Block::Extrinsic>>,
) -> Result<(BlockImportParams<Block, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
) -> BlockVerificationResult<Block> {
trace!(
target: "babe",
"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
Expand Down Expand Up @@ -1173,7 +1187,7 @@ where
&header,
&verified_info.author,
&origin,
) {
).await {
warn!(target: "babe", "Error checking/reporting BABE equivocation: {:?}", err);
}

Expand Down
1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ readme = "README.md"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
async-trait = "0.1"
sc-client-api = { version = "3.0.0", path = "../../api" }
sp-blockchain = { version = "3.0.0", path = "../../../primitives/blockchain" }
sp-runtime = { version = "3.0.0", path = "../../../primitives/runtime" }
Expand Down
32 changes: 16 additions & 16 deletions client/consensus/common/src/longest_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ impl<B, Block> Clone for LongestChain<B, Block> {
}

impl<B, Block> LongestChain<B, Block>
where
B: backend::Backend<Block>,
Block: BlockT,
where
B: backend::Backend<Block>,
Block: BlockT,
{
/// Instantiate a new LongestChain for Backend B
pub fn new(backend: Arc<B>) -> Self {
LongestChain {
backend,
_phantom: Default::default()
_phantom: Default::default(),
}
}

Expand All @@ -75,30 +75,30 @@ impl<B, Block> LongestChain<B, Block>
}
}

#[async_trait::async_trait]
impl<B, Block> SelectChain<Block> for LongestChain<B, Block>
where
B: backend::Backend<Block>,
Block: BlockT,
where
B: backend::Backend<Block>,
Block: BlockT,
{

fn leaves(&self) -> Result<Vec<<Block as BlockT>::Hash>, ConsensusError> {
LongestChain::leaves(self)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()).into())
async fn leaves(&self) -> Result<Vec<<Block as BlockT>::Hash>, ConsensusError> {
LongestChain::leaves(self).map_err(|e| ConsensusError::ChainLookup(e.to_string()).into())
}

fn best_chain(&self) -> Result<<Block as BlockT>::Header, ConsensusError>
{
async fn best_chain(&self) -> Result<<Block as BlockT>::Header, ConsensusError> {
LongestChain::best_block_header(&self)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()).into())
}

fn finality_target(
async fn finality_target(
&self,
target_hash: Block::Hash,
maybe_max_number: Option<NumberFor<Block>>
maybe_max_number: Option<NumberFor<Block>>,
) -> Result<Option<Block::Hash>, ConsensusError> {
let import_lock = self.backend.get_import_lock();
self.backend.blockchain().best_containing(target_hash, maybe_max_number, import_lock)
self.backend
.blockchain()
.best_containing(target_hash, maybe_max_number, import_lock)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()).into())
}
}
52 changes: 27 additions & 25 deletions client/consensus/manual-seal/src/seal_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,45 +80,47 @@ pub async fn seal_block<B, BI, SC, C, E, P, CIDP>(
create_inherent_data_providers,
consensus_data_provider: digest_provider,
mut sender,
}: SealBlockParams<'_, B, BI, SC, C, E, P, CIDP>
)
where
B: BlockT,
BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
+ Send + Sync + 'static,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
E: Environment<B>,
E::Proposer: Proposer<B, Transaction = TransactionFor<C, B>>,
P: txpool::ChainApi<Block=B>,
SC: SelectChain<B>,
TransactionFor<C, B>: 'static,
CIDP: CreateInherentDataProviders<B, ()>,
}: SealBlockParams<'_, B, BI, SC, C, E, P, CIDP>,
) where
B: BlockT,
BI: BlockImport<B, Error = sp_consensus::Error, Transaction = sp_api::TransactionFor<C, B>>
+ Send
+ Sync
+ 'static,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
E: Environment<B>,
E::Proposer: Proposer<B, Transaction = TransactionFor<C, B>>,
P: txpool::ChainApi<Block = B>,
SC: SelectChain<B>,
TransactionFor<C, B>: 'static,
CIDP: CreateInherentDataProviders<B, ()>,
{
let future = async {
if pool.validated_pool().status().ready == 0 && !create_empty {
return Err(Error::EmptyTransactionPool)
return Err(Error::EmptyTransactionPool);
}

// get the header to build this new block on.
// use the parent_hash supplied via `EngineCommand`
// or fetch the best_block.
let parent = match parent_hash {
Some(hash) => {
client.header(BlockId::Hash(hash))?.ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?
}
None => select_chain.best_chain()?
Some(hash) => client
.header(BlockId::Hash(hash))?
.ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?,
None => select_chain.best_chain().await?,
};

let inherent_data_providers =
create_inherent_data_providers
.create_inherent_data_providers(parent.hash(), ())
.await
.map_err(|e| Error::Other(e))?;
let inherent_data_providers = create_inherent_data_providers
.create_inherent_data_providers(parent.hash(), ())
.await
.map_err(|e| Error::Other(e))?;

let inherent_data = inherent_data_providers.create_inherent_data()?;

let proposer = env.init(&parent)
.map_err(|err| Error::StringError(format!("{:?}", err))).await?;
let proposer = env
.init(&parent)
.map_err(|err| Error::StringError(format!("{:?}", err)))
.await?;
let inherents_len = inherent_data.len();

let digest = if let Some(digest_provider) = digest_provider {
Expand Down
12 changes: 8 additions & 4 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ where
mut block: BlockImportParams<B, Self::Transaction>,
new_cache: HashMap<CacheKeyId, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
let best_header = self.select_chain.best_chain()
let best_header = self
.select_chain
.best_chain()
.await
.map_err(|e| format!("Fetch best chain failed via select chain: {:?}", e))?;
let best_hash = best_header.hash();

Expand Down Expand Up @@ -543,7 +546,8 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
impl Future<Output = ()>,
) where
)
where
Block: BlockT,
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
S: SelectChain<Block> + 'static,
Expand Down Expand Up @@ -578,7 +582,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
return;
}

let best_header = match select_chain.best_chain() {
let best_header = match select_chain.best_chain().await {
Ok(x) => x,
Err(err) => {
warn!(
Expand All @@ -588,7 +592,7 @@ pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
err
);
return;
},
}
};
let best_hash = best_header.hash();

Expand Down
2 changes: 1 addition & 1 deletion client/consensus/slots/src/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where

let ends_at = Instant::now() + ends_in;

let chain_head = match self.client.best_chain() {
let chain_head = match self.client.best_chain().await {
Ok(x) => x,
Err(e) => {
log::warn!(
Expand Down
Loading