Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions node/network/availability-distribution/src/requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::{
};

use polkadot_node_subsystem_util::runtime::{get_occupied_cores, RuntimeInfo};
use polkadot_primitives::v2::{CandidateHash, Hash, OccupiedCore};
use polkadot_primitives::v2::{CandidateHash, Hash, OccupiedCore, SessionIndex};
use polkadot_subsystem::{
messages::{AllMessages, ChainApiMessage},
ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext,
Expand Down Expand Up @@ -130,22 +130,13 @@ impl Requester {
Context: SubsystemContext,
{
let ActivatedLeaf { hash: leaf, .. } = new_head;
let ancestors_in_session = get_block_ancestors_in_same_session(
let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session(
ctx,
runtime,
leaf,
Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
)
.await
.unwrap_or_else(|err| {
tracing::debug!(
target: LOG_TARGET,
leaf = ?leaf,
"Failed to fetch leaf ancestors in the same session due to an error: {}",
err
);
Vec::new()
});
.await?;
// Also spawn or bump tasks for candidates in ancestry in the same session.
for hash in std::iter::once(leaf).chain(ancestors_in_session) {
let cores = get_occupied_cores(ctx, hash).await?;
Expand All @@ -161,7 +152,7 @@ impl Requester {
// The next time the subsystem receives leaf update, some of spawned task will be bumped
// to be live in fresh relay parent, while some might get dropped due to the current leaf
// being deactivated.
self.add_cores(ctx, runtime, leaf, cores).await?;
self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?;
}

Ok(())
Expand Down Expand Up @@ -189,6 +180,7 @@ impl Requester {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
leaf: Hash,
leaf_session_index: SessionIndex,
cores: impl IntoIterator<Item = OccupiedCore>,
) -> Result<()>
where
Expand All @@ -215,6 +207,7 @@ impl Requester {
// at session boundaries. At the same time, only leaves are guaranteed to
// be fetchable by the state trie.
leaf,
leaf_session_index,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info),
)
.await
Expand Down Expand Up @@ -262,12 +255,14 @@ impl Stream for Requester {
}

/// Requests up to `limit` ancestor hashes of relay parent in the same session.
///
/// Also returns session index of the `head`.
async fn get_block_ancestors_in_same_session<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
head: Hash,
limit: usize,
) -> Result<Vec<Hash>>
) -> Result<(SessionIndex, Vec<Hash>)>
where
Context: SubsystemContext,
{
Expand All @@ -284,7 +279,7 @@ where
Some(parent) => runtime.get_session_index_for_child(ctx.sender(), *parent).await?,
None => {
// No first element, i.e. empty.
return Ok(ancestors)
return Ok((0, ancestors))
},
};

Expand All @@ -303,7 +298,7 @@ where
// Drop the rest.
ancestors.truncate(session_ancestry_len);

Ok(ancestors)
Ok((head_session_index, ancestors))
}

/// Request up to `limit` ancestor hashes of relay parent from the Chain API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ impl SessionCache {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
parent: Hash,
session_index: SessionIndex,
with_info: F,
) -> Result<Option<R>>
where
Context: SubsystemContext,
F: FnOnce(&SessionInfo) -> R,
{
let session_index = runtime.get_session_index_for_child(ctx.sender(), parent).await?;

if let Some(o_info) = self.session_info_cache.get(&session_index) {
tracing::trace!(target: LOG_TARGET, session_index, "Got session from lru");
return Ok(Some(with_info(o_info)))
Expand Down
10 changes: 9 additions & 1 deletion node/network/availability-distribution/src/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use polkadot_primitives::v2::{
};
use polkadot_subsystem::{
messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
},
ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal,
Expand Down Expand Up @@ -278,6 +278,14 @@ impl TestState {
},
}
},
AllMessages::ChainApi(ChainApiMessage::Ancestors { hash, k, response_channel }) => {
let chain = &self.relay_chain;
let maybe_block_position = chain.iter().position(|h| *h == hash);
let ancestors = maybe_block_position
.map(|idx| chain[..idx].iter().rev().take(k).copied().collect())
.unwrap_or_default();
response_channel.send(Ok(ancestors)).expect("Receiver is expected to be alive");
},
_ => {},
}
}
Expand Down