Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make candidate validation bounded again #2125

Merged
merged 19 commits into from
Jan 21, 2024
Merged
Changes from 3 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 103 additions & 98 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ use polkadot_primitives::{

use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*};
use futures::{channel::oneshot, prelude::*, StreamExt};

use std::{
path::PathBuf,
sync::Arc,
task::Poll,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -152,105 +153,109 @@ async fn run<Context>(
)
.await;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;

loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState(
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let bg = {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-chain-state", bg.boxed())?;
},
CandidateValidationMessage::ValidateFromExhaustive(
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-exhaustive", bg.boxed())?;
},
CandidateValidationMessage::PreCheck(
relay_parent,
validation_code_hash,
response_sender,
) => {
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();

async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;

let _ = response_sender.send(precheck_result);
}
};

ctx.spawn("candidate-validation-pre-check", bg.boxed())?;
},
let mut res = Ok(());
let sender = ctx.sender().to_owned();

let read_stream = stream::poll_fn(|c| loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible way for implementing this would be to use a async_semaphore.

let s = Semaphore::new(MAX_CONCURENT_REQUESTS))

s.acquire() before all the spawn and you move the resulting guard in the future.

match ctx.recv().poll_unpin(c) {
Poll::Ready(Ok(FromOrchestra::Signal(OverseerSignal::Conclude))) =>
return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(FromOrchestra::Signal(_))) => continue,
Poll::Ready(Ok(FromOrchestra::Communication { msg })) => return Poll::Ready(Some(msg)),
Poll::Ready(Err(e)) => {
res = Err(e);
return Poll::Ready(None)
},
}
});

read_stream
// NB: Cloning `sender` inside `async` block of `for_each_concurrent` renders the whole
// thing not `Send`, so we `zip` the message stream with the stream of `sender` clones here.
.zip(stream::repeat(sender))
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a
// common constant, or another appropriate value should be chosen
.for_each_concurrent(30, |message_and_sender| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we add the common constant now rather than the todo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I'm not totally sure it makes sense. If the candidate validation messages were just retransmitted to the PVF host's queue, it would be fine. But some invariants (like ValidateFromChainState) require candidate validation subsys to do additional work involving runtime requests on which it awaits. So, if 30 ValidateFromChainState messages arrive, they are processed concurrently, and the PVF host queue is empty. If some ValidateFromExhaustive arrives at the moment, it is waiting until a free slot appears. But it could indeed fall through into the queue as it doesn't require any additional processing. That's why I think the limit here could be higher than the queue limit.
On the other hand, 30 concurrent ValidateFromChainStates is an unlikely situation. The real message stream is mixed Validate* requests with a handful of pre-checking requests. So I don't know, I just cannot decide which value is appropriate here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is that for_each_concurrent is not equivalent with calling ctx.spawn several times, because

the closure are run concurrently (but not in parallel– this combinator does not introduce any threads).

ctx.spawn gives us the opportunity if this being possibly run in parallel, not sure if it really matters, it might not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speculatively, it has to be okay. What it actually does under the hood is to send a payload to the PVF host and then wait for the answer. All the actual processing is done by the PVF host within its own threading model. But I'm not 100% sure I'm not missing something not obvious here.

Copy link
Contributor

@alindima alindima Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it also does code decompression, which can take a couple of ms: #599.
So that means that one future will block all other from making progress, since they are not being spawned on new tasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, fair enough. And now we have a dilemma. Go back to the FuturesUnordered and add a semaphore, or move the decompression to the validation host (something we want to do anyway) and leave this PR as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your call :D.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarked the decompression with the Moonbeam runtime (a large one), it takes 4.33 msec, not the end of the world, but still significant.

As far as I understand the problem, moving the decompression to the PVF host side (which requires some effort by itself) is only meaningful if we run the PVF host on the blocking pool? And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression?

If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side. I benchmarked that just in case; it doesn't introduce any overhead and is a future-proof solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me. or we can just use regular spawn like before and see if #599 is worth taking a look at separately.

maybe it's too much of a micro-optimisation that doesn't make any difference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression?

If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side.

Sounds good to me too. I do think the PVF host should be non-blocking as it does no blocking work itself. It waits on the child processes but that shouldn't block.

handle_candidate_validation_message(
message_and_sender.1,
validation_host.clone(),
metrics.clone(),
message_and_sender.0,
)
})
.await;

res
}

async fn handle_candidate_validation_message<Sender>(
mut sender: Sender,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) where
Sender: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState(
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
},
CandidateValidationMessage::ValidateFromExhaustive(
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
response_sender,
) => {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
persisted_validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
timeout,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
},
CandidateValidationMessage::PreCheck(
relay_parent,
validation_code_hash,
response_sender,
) => {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;

let _ = response_sender.send(precheck_result);
},
}
}

Expand Down
Loading