Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fixes the dead lock when any of the channels get at capacity. (#5297)
Browse files Browse the repository at this point in the history
The PVF host is designed to avoid spawning tasks to minimize knowledge
of outer code. Using `async_std::task::spawn` (or Tokio's counterpart)
deemed unacceptable, `SpawnNamed` undesirable. Instead there is only one
task returned that is spawned by the candidate-validation subsystem.
The tasks from the sub-components are polled by that root task.

However, the way the tasks are bundled was incorrect. There was a giant
select that was polling those tasks. Particularly, that implies that as soon as
one of the arms of that select goes into await those sub-tasks stop
getting polled. This is a recipe for a deadlock which indeed happened
here.

Specifically, the deadlock happened during sending messages to the
execute queue by calling
[`send_execute`](https://github.com/paritytech/polkadot/blob/818da57386900e9b3eb934eaf868161ed3eb6d70/node/core/pvf/src/host.rs#L601).
When the channel to the queue reaches the capacity, the control flow is
suspended until the queue handles those messages. Since this code is
essentially reached from [one of the select
arms](https://github.com/paritytech/polkadot/blob/818da57386900e9b3eb934eaf868161ed3eb6d70/node/core/pvf/src/host.rs#L371),
the queue won't be given the control and thus no further progress can be
made.

This problem is solved by bundling the tasks one level higher instead,
by `selecting` over those long-running tasks.

We also stop treating returning from those long-running tasks as error
conditions, since that can happen during legit shutdown.
  • Loading branch information
pepyakin authored Apr 9, 2022
1 parent 818da57 commit dc2b2ac
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 70 deletions.
123 changes: 53 additions & 70 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,33 +201,36 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future<O
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(100);
let run_sweeper = sweeper_task(to_sweeper_rx);

let run = async move {
let run_host = async move {
let artifacts = Artifacts::new(&config.cache_path).await;

futures::pin_mut!(run_prepare_queue, run_prepare_pool, run_execute_queue, run_sweeper);

run(
Inner {
cache_path: config.cache_path,
cleanup_pulse_interval: Duration::from_secs(3600),
artifact_ttl: Duration::from_secs(3600 * 24),
artifacts,
to_host_rx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
},
run_prepare_pool,
run_prepare_queue,
run_execute_queue,
run_sweeper,
)
run(Inner {
cache_path: config.cache_path,
cleanup_pulse_interval: Duration::from_secs(3600),
artifact_ttl: Duration::from_secs(3600 * 24),
artifacts,
to_host_rx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
})
.await
};

(validation_host, run)
let task = async move {
// Bundle the sub-components' tasks together into a single future.
futures::select! {
_ = run_host.fuse() => {},
_ = run_prepare_queue.fuse() => {},
_ = run_prepare_pool.fuse() => {},
_ = run_execute_queue.fuse() => {},
_ = run_sweeper.fuse() => {},
};
};

(validation_host, task)
}

/// An execution request that should execute the PVF (known in the context) and send the results
Expand Down Expand Up @@ -297,15 +300,18 @@ async fn run(
mut to_sweeper_tx,
mut awaiting_prepare,
}: Inner,
prepare_pool: impl Future<Output = ()> + Unpin,
prepare_queue: impl Future<Output = ()> + Unpin,
execute_queue: impl Future<Output = ()> + Unpin,
sweeper: impl Future<Output = ()> + Unpin,
) {
macro_rules! break_if_fatal {
($expr:expr) => {
match $expr {
Err(Fatal) => break,
Err(Fatal) => {
gum::error!(
target: LOG_TARGET,
"Fatal error occurred, terminating the host. Line: {}",
line!(),
);
break
},
Ok(v) => v,
}
};
Expand All @@ -317,31 +323,9 @@ async fn run(
let mut to_host_rx = to_host_rx.fuse();
let mut from_prepare_queue_rx = from_prepare_queue_rx.fuse();

// Make sure that the task-futures are fused.
let mut prepare_queue = prepare_queue.fuse();
let mut prepare_pool = prepare_pool.fuse();
let mut execute_queue = execute_queue.fuse();
let mut sweeper = sweeper.fuse();

loop {
// biased to make it behave deterministically for tests.
futures::select_biased! {
_ = prepare_queue => {
never!("prepare_pool: long-running task never concludes; qed");
break;
},
_ = prepare_pool => {
never!("prepare_pool: long-running task never concludes; qed");
break;
},
_ = execute_queue => {
never!("execute_queue: long-running task never concludes; qed");
break;
},
_ = sweeper => {
never!("sweeper: long-running task never concludes; qed");
break;
},
() = cleanup_pulse.select_next_some() => {
// `select_next_some` because we don't expect this to fail, but if it does, we
// still don't fail. The tradeoff is that the compiled cache will start growing
Expand All @@ -356,7 +340,14 @@ async fn run(
).await);
},
to_host = to_host_rx.next() => {
let to_host = break_if_fatal!(to_host.ok_or(Fatal));
let to_host = match to_host {
None => {
// The sending half of the channel has been closed, meaning the
// `ValidationHost` struct was dropped. Shutting down gracefully.
break;
},
Some(to_host) => to_host,
};

break_if_fatal!(handle_to_host(
&cache_path,
Expand Down Expand Up @@ -761,26 +752,18 @@ mod tests {
let (to_execute_queue_tx, to_execute_queue_rx) = mpsc::channel(10);
let (to_sweeper_tx, to_sweeper_rx) = mpsc::channel(10);

let mk_dummy_loop = || std::future::pending().boxed();

let run = run(
Inner {
cache_path,
cleanup_pulse_interval,
artifact_ttl,
artifacts,
to_host_rx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
},
mk_dummy_loop(),
mk_dummy_loop(),
mk_dummy_loop(),
mk_dummy_loop(),
)
let run = run(Inner {
cache_path,
cleanup_pulse_interval,
artifact_ttl,
artifacts,
to_host_rx,
to_prepare_queue_tx,
from_prepare_queue_rx,
to_execute_queue_tx,
to_sweeper_tx,
awaiting_prepare: AwaitingPrepare::default(),
})
.boxed();

Self {
Expand Down
30 changes: 30 additions & 0 deletions node/core/pvf/tests/it/adder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,33 @@ async fn execute_bad_on_parent() {
.await
.unwrap_err();
}

#[async_std::test]
async fn stress_spawn() {
let host = std::sync::Arc::new(TestHost::new());

async fn execute(host: std::sync::Arc<TestHost>) {
let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) };
let block_data = BlockData { state: 0, add: 512 };
let ret = host
.validate_candidate(
adder::wasm_binary_unwrap(),
ValidationParams {
parent_head: GenericHeadData(parent_head.encode()),
block_data: GenericBlockData(block_data.encode()),
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
)
.await
.unwrap();

let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();

assert_eq!(new_head.number, 1);
assert_eq!(new_head.parent_hash, parent_head.hash());
assert_eq!(new_head.post_state, hash_state(512));
}

futures::future::join_all((0..100).map(|_| execute(host.clone()))).await;
}

0 comments on commit dc2b2ac

Please sign in to comment.