Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
fixins
Browse files Browse the repository at this point in the history
  • Loading branch information
drahnr committed Apr 22, 2022
1 parent 63347b3 commit b9900c5
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 125 deletions.
35 changes: 5 additions & 30 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,7 @@ impl ApprovalVotingSubsystem {

impl<Context> overseer::Subsystem<Context, SubsystemError> for ApprovalVotingSubsystem
where
Context: overseer::SubsystemContext<
Message = ApprovalVotingMessage,
OutgoingMessages = overseer::ApprovalVotingOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ApprovalVotingContextTrait,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = DbBackend::new(self.db.clone(), self.db_config);
Expand Down Expand Up @@ -700,12 +695,7 @@ async fn run<B, Context>(
mut backend: B,
) -> SubsystemResult<()>
where
Context: overseer::SubsystemContext<
Message = ApprovalVotingMessage,
OutgoingMessages = overseer::ApprovalVotingOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ApprovalVotingContextTrait,
B: Backend,
{
let mut state = State {
Expand Down Expand Up @@ -844,12 +834,7 @@ where
//
// returns `true` if any of the actions was a `Conclude` command.
async fn handle_actions(
ctx: &mut impl overseer::SubsystemContext<
Message = ApprovalVotingMessage,
OutgoingMessages = overseer::ApprovalVotingOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
ctx: &mut impl overseer::ApprovalVotingContextTrait,
state: &mut State,
overlayed_db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
Expand Down Expand Up @@ -1086,12 +1071,7 @@ fn distribution_messages_for_activation(

// Handle an incoming signal from the overseer. Returns true if execution should conclude.
async fn handle_from_overseer(
ctx: &mut impl overseer::SubsystemContext<
Message = ApprovalVotingMessage,
OutgoingMessages = overseer::ApprovalVotingOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
ctx: &mut impl overseer::ApprovalVotingContextTrait,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
metrics: &Metrics,
Expand Down Expand Up @@ -2150,12 +2130,7 @@ fn process_wakeup(
// spawned. When the background work is no longer needed, the `AbortHandle` should be dropped
// to cancel the background work and any requests it has spawned.
async fn launch_approval(
ctx: &mut impl overseer::SubsystemContext<
Message = ApprovalVotingMessage,
OutgoingMessages = overseer::ApprovalVotingOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
ctx: &mut impl overseer::ApprovalVotingContextTrait,
metrics: Metrics,
session_index: SessionIndex,
candidate: CandidateReceipt,
Expand Down
42 changes: 6 additions & 36 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,7 @@ impl KnownUnfinalizedBlocks {

impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityStoreSubsystem
where
Context: overseer::SubsystemContext<
Message = AvailabilityStoreMessage,
OutgoingMessages = overseer::AvailabilityStoreOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::AvailabilityStoreContextTrait,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(self, ctx).map(|_| Ok(())).boxed();
Expand All @@ -533,12 +528,7 @@ where

async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
where
Context: overseer::SubsystemContext<
Message = AvailabilityStoreMessage,
OutgoingMessages = overseer::AvailabilityStoreOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::AvailabilityStoreContextTrait,
{
let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

Expand Down Expand Up @@ -566,12 +556,7 @@ async fn run_iteration<Context>(
mut next_pruning: &mut future::Fuse<Delay>,
) -> Result<bool, Error>
where
Context: overseer::SubsystemContext<
Message = AvailabilityStoreMessage,
OutgoingMessages = overseer::AvailabilityStoreOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::AvailabilityStoreContextTrait,
{
select! {
incoming = ctx.recv().fuse() => {
Expand Down Expand Up @@ -622,12 +607,7 @@ async fn process_block_activated<Context>(
activated: Hash,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<
Message = AvailabilityStoreMessage,
OutgoingMessages = overseer::AvailabilityStoreOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::AvailabilityStoreContextTrait,
{
let now = subsystem.clock.now()?;

Expand Down Expand Up @@ -686,12 +666,7 @@ async fn process_new_head<Context>(
header: Header,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<
Message = AvailabilityStoreMessage,
OutgoingMessages = overseer::AvailabilityStoreOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::AvailabilityStoreContextTrait,
{
let candidate_events = util::request_candidate_events(hash, ctx.sender()).await.await??;

Expand Down Expand Up @@ -837,12 +812,7 @@ async fn process_block_finalized<Context>(
finalized_number: BlockNumber,
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<
Message = AvailabilityStoreMessage,
OutgoingMessages = overseer::AvailabilityStoreOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::AvailabilityStoreContextTrait,
{
let now = subsystem.clock.now()?;

Expand Down
14 changes: 2 additions & 12 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ impl CandidateValidationSubsystem {

impl<Context> overseer::Subsystem<Context, SubsystemError> for CandidateValidationSubsystem
where
Context: overseer::SubsystemContext<
Message = CandidateValidationMessage,
OutgoingMessages = overseer::CandidateValidationOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::CandidateValidationContextTrait,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(
Expand All @@ -122,12 +117,7 @@ async fn run<Context>(
program_path: PathBuf,
) -> SubsystemResult<()>
where
Context: overseer::SubsystemContext<
Message = CandidateValidationMessage,
OutgoingMessages = overseer::CandidateValidationOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::CandidateValidationContextTrait,
{
let (validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(cache_path, program_path),
Expand Down
14 changes: 2 additions & 12 deletions node/core/chain-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,7 @@ impl<Client> ChainApiSubsystem<Client> {
impl<Client, Context> overseer::Subsystem<Context, SubsystemError> for ChainApiSubsystem<Client>
where
Client: HeaderBackend<Block> + AuxStore + 'static,
Context: overseer::SubsystemContext<
Message = ChainApiMessage,
OutgoingMessages = overseer::ChainApiOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ChainApiContextTrait,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run::<Client, Context>(ctx, self)
Expand All @@ -86,12 +81,7 @@ async fn run<Client, Context>(
) -> SubsystemResult<()>
where
Client: HeaderBackend<Block> + AuxStore,
Context: overseer::SubsystemContext<
Message = ChainApiMessage,
OutgoingMessages = overseer::ChainApiOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ChainApiContextTrait,
{
loop {
match ctx.recv().await? {
Expand Down
55 changes: 20 additions & 35 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use polkadot_node_primitives::BlockWeight;
use polkadot_node_subsystem::{
errors::ChainApiError,
messages::{ChainApiMessage, ChainSelectionMessage},
overseer, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
overseer::{self, SubsystemSender}, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError,
};
use polkadot_node_subsystem_util::database::Database;
use polkadot_primitives::v2::{BlockNumber, ConsensusLog, Hash, Header};
Expand Down Expand Up @@ -319,12 +319,7 @@ impl ChainSelectionSubsystem {

impl<Context> overseer::Subsystem<Context, SubsystemError> for ChainSelectionSubsystem
where
Context: overseer::SubsystemContext<
Message = ChainSelectionMessage,
OutgoingMessages = overseer::ChainSelectionOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ChainSelectionContextTrait,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let backend = crate::db_backend::v1::DbBackend::new(
Expand All @@ -347,12 +342,7 @@ async fn run<Context, B>(
stagnant_check_interval: StagnantCheckInterval,
clock: Box<dyn Clock + Send + Sync>,
) where
Context: overseer::SubsystemContext<
Message = ChainSelectionMessage,
OutgoingMessages = overseer::ChainSelectionOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ChainSelectionContextTrait,
B: Backend,
{
loop {
Expand Down Expand Up @@ -383,12 +373,7 @@ async fn run_until_error<Context, B>(
clock: &(dyn Clock + Sync),
) -> Result<(), Error>
where
Context: overseer::SubsystemContext<
Message = ChainSelectionMessage,
OutgoingMessages = overseer::ChainSelectionOutgoingMessages,
Signal = OverseerSignal,
Error = SubsystemError,
>,
Context: overseer::ChainSelectionContextTrait,
B: Backend,
{
let mut stagnant_check_stream = stagnant_check_interval.timeout_stream();
Expand All @@ -403,7 +388,7 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
for leaf in update.activated {
let write_ops = handle_active_leaf(
ctx,
ctx.sender(),
&*backend,
clock.timestamp_now() + STAGNANT_TIMEOUT,
leaf.hash,
Expand All @@ -420,7 +405,7 @@ where
handle_approved_block(backend, hash)?
}
ChainSelectionMessage::Leaves(tx) => {
let leaves = load_leaves(ctx, &*backend).await?;
let leaves = load_leaves(ctx.sender(), &*backend).await?;
let _ = tx.send(leaves);
}
ChainSelectionMessage::BestLeafContaining(required, tx) => {
Expand All @@ -447,11 +432,11 @@ where
}

async fn fetch_finalized(
ctx: &mut impl SubsystemContext,
sender: &mut impl SubsystemSender<ChainApiMessage>,
) -> Result<Option<(Hash, BlockNumber)>, Error> {
let (number_tx, number_rx) = oneshot::channel();

ctx.send_message(ChainApiMessage::FinalizedBlockNumber(number_tx)).await;
sender.send_message(ChainApiMessage::FinalizedBlockNumber(number_tx)).await;

let number = match number_rx.await? {
Ok(number) => number,
Expand All @@ -463,7 +448,7 @@ async fn fetch_finalized(

let (hash_tx, hash_rx) = oneshot::channel();

ctx.send_message(ChainApiMessage::FinalizedBlockHash(number, hash_tx)).await;
sender.send_message(ChainApiMessage::FinalizedBlockHash(number, hash_tx)).await;

match hash_rx.await? {
Err(err) => {
Expand All @@ -479,11 +464,11 @@ async fn fetch_finalized(
}

async fn fetch_header(
ctx: &mut impl SubsystemContext,
sender: &mut impl SubsystemSender<ChainApiMessage>,
hash: Hash,
) -> Result<Option<Header>, Error> {
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(hash, tx)).await;
sender.send_message(ChainApiMessage::BlockHeader(hash, tx)).await;

Ok(rx.await?.unwrap_or_else(|err| {
gum::warn!(target: LOG_TARGET, ?hash, ?err, "Missing hash for finalized block number");
Expand All @@ -492,11 +477,11 @@ async fn fetch_header(
}

async fn fetch_block_weight(
ctx: &mut impl SubsystemContext,
sender: &mut impl overseer::SubsystemSender<ChainApiMessage>,
hash: Hash,
) -> Result<Option<BlockWeight>, Error> {
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockWeight(hash, tx)).await;
sender.send_message(ChainApiMessage::BlockWeight(hash, tx)).await;

let res = rx.await?;

Expand All @@ -508,7 +493,7 @@ async fn fetch_block_weight(

// Handle a new active leaf.
async fn handle_active_leaf(
ctx: &mut impl SubsystemContext,
sender: &mut impl overseer::ChainSelectionSenderTrait,
backend: &impl Backend,
stagnant_at: Timestamp,
hash: Hash,
Expand All @@ -520,10 +505,10 @@ async fn handle_active_leaf(
// tree.
l.saturating_sub(1)
},
None => fetch_finalized(ctx).await?.map_or(1, |(_, n)| n),
None => fetch_finalized(sender).await?.map_or(1, |(_, n)| n),
};

let header = match fetch_header(ctx, hash).await? {
let header = match fetch_header(sender, hash).await? {
None => {
gum::warn!(target: LOG_TARGET, ?hash, "Missing header for new head");
return Ok(Vec::new())
Expand All @@ -532,7 +517,7 @@ async fn handle_active_leaf(
};

let new_blocks = polkadot_node_subsystem_util::determine_new_blocks(
ctx.sender(),
sender,
|h| backend.load_block_entry(h).map(|b| b.is_some()),
hash,
&header,
Expand All @@ -545,7 +530,7 @@ async fn handle_active_leaf(
// determine_new_blocks gives blocks in descending order.
// for this, we want ascending order.
for (hash, header) in new_blocks.into_iter().rev() {
let weight = match fetch_block_weight(ctx, hash).await? {
let weight = match fetch_block_weight(sender, hash).await? {
None => {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -656,13 +641,13 @@ fn detect_stagnant(backend: &mut impl Backend, now: Timestamp) -> Result<(), Err
// Load the leaves from the backend. If there are no leaves, then return
// the finalized block.
async fn load_leaves(
ctx: &mut impl SubsystemContext,
sender: &mut impl overseer::SubsystemSender<ChainApiMessage>,
backend: &impl Backend,
) -> Result<Vec<Hash>, Error> {
let leaves: Vec<_> = backend.load_leaves()?.into_hashes_descending().collect();

if leaves.is_empty() {
Ok(fetch_finalized(ctx).await?.map_or(Vec::new(), |(h, _)| vec![h]))
Ok(fetch_finalized(sender).await?.map_or(Vec::new(), |(h, _)| vec![h]))
} else {
Ok(leaves)
}
Expand Down

0 comments on commit b9900c5

Please sign in to comment.