-
Notifications
You must be signed in to change notification settings - Fork 668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make candidate validation bounded again #2125
Changes from 3 commits
092d6a3
7f1698c
4e64508
6e0f461
83a0fe4
7e88b15
0663ba0
047cc03
52b5c5e
a4cdcf8
b25eda0
77aec82
6bcc212
fb20432
ca97f47
a0e8065
4c6e73d
05e3b83
92bacd3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,11 +55,12 @@ use polkadot_primitives::{ | |
|
||
use parity_scale_codec::Encode; | ||
|
||
use futures::{channel::oneshot, prelude::*}; | ||
use futures::{channel::oneshot, prelude::*, StreamExt}; | ||
|
||
use std::{ | ||
path::PathBuf, | ||
sync::Arc, | ||
task::Poll, | ||
time::{Duration, Instant}, | ||
}; | ||
|
||
|
@@ -152,105 +153,109 @@ async fn run<Context>( | |
) | ||
.await; | ||
ctx.spawn_blocking("pvf-validation-host", task.boxed())?; | ||
|
||
loop { | ||
match ctx.recv().await? { | ||
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {}, | ||
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, | ||
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), | ||
FromOrchestra::Communication { msg } => match msg { | ||
CandidateValidationMessage::ValidateFromChainState( | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
response_sender, | ||
) => { | ||
let bg = { | ||
let mut sender = ctx.sender().clone(); | ||
let metrics = metrics.clone(); | ||
let validation_host = validation_host.clone(); | ||
|
||
async move { | ||
let _timer = metrics.time_validate_from_chain_state(); | ||
let res = validate_from_chain_state( | ||
&mut sender, | ||
validation_host, | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
&metrics, | ||
) | ||
.await; | ||
|
||
metrics.on_validation_event(&res); | ||
let _ = response_sender.send(res); | ||
} | ||
}; | ||
|
||
ctx.spawn("validate-from-chain-state", bg.boxed())?; | ||
}, | ||
CandidateValidationMessage::ValidateFromExhaustive( | ||
persisted_validation_data, | ||
validation_code, | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
response_sender, | ||
) => { | ||
let bg = { | ||
let metrics = metrics.clone(); | ||
let validation_host = validation_host.clone(); | ||
|
||
async move { | ||
let _timer = metrics.time_validate_from_exhaustive(); | ||
let res = validate_candidate_exhaustive( | ||
validation_host, | ||
persisted_validation_data, | ||
validation_code, | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
&metrics, | ||
) | ||
.await; | ||
|
||
metrics.on_validation_event(&res); | ||
let _ = response_sender.send(res); | ||
} | ||
}; | ||
|
||
ctx.spawn("validate-from-exhaustive", bg.boxed())?; | ||
}, | ||
CandidateValidationMessage::PreCheck( | ||
relay_parent, | ||
validation_code_hash, | ||
response_sender, | ||
) => { | ||
let bg = { | ||
let mut sender = ctx.sender().clone(); | ||
let validation_host = validation_host.clone(); | ||
|
||
async move { | ||
let precheck_result = precheck_pvf( | ||
&mut sender, | ||
validation_host, | ||
relay_parent, | ||
validation_code_hash, | ||
) | ||
.await; | ||
|
||
let _ = response_sender.send(precheck_result); | ||
} | ||
}; | ||
|
||
ctx.spawn("candidate-validation-pre-check", bg.boxed())?; | ||
}, | ||
let mut res = Ok(()); | ||
let sender = ctx.sender().to_owned(); | ||
|
||
let read_stream = stream::poll_fn(|c| loop { | ||
match ctx.recv().poll_unpin(c) { | ||
Poll::Ready(Ok(FromOrchestra::Signal(OverseerSignal::Conclude))) => | ||
return Poll::Ready(None), | ||
Poll::Pending => return Poll::Pending, | ||
Poll::Ready(Ok(FromOrchestra::Signal(_))) => continue, | ||
Poll::Ready(Ok(FromOrchestra::Communication { msg })) => return Poll::Ready(Some(msg)), | ||
Poll::Ready(Err(e)) => { | ||
res = Err(e); | ||
return Poll::Ready(None) | ||
}, | ||
} | ||
}); | ||
|
||
read_stream | ||
// NB: Cloning `sender` inside `async` block of `for_each_concurrent` renders the whole | ||
// thing not `Send`, so we `zip` the message stream with the stream of `sender` clones here. | ||
.zip(stream::repeat(sender)) | ||
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a | ||
// common constant, or another appropriate value should be chosen | ||
.for_each_concurrent(30, |message_and_sender| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we add the common constant now rather than the todo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can, but I'm not totally sure it makes sense. If the candidate validation messages were just retransmitted to the PVF host's queue, it would be fine. But some invariants (like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another thing is that
ctx.spawn gives us the opportunity if this being possibly run in parallel, not sure if it really matters, it might not. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Speculatively, it has to be okay. What it actually does under the hood is to send a payload to the PVF host and then wait for the answer. All the actual processing is done by the PVF host within its own threading model. But I'm not 100% sure I'm not missing something not obvious here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, it also does code decompression, which can take a couple of ms: #599. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, fair enough. And now we have a dilemma. Go back to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your call :D. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Benchmarked the decompression with the Moonbeam runtime (a large one), it takes 4.33 msec, not the end of the world, but still significant. As far as I understand the problem, moving the decompression to the PVF host side (which requires some effort by itself) is only meaningful if we run the PVF host on the blocking pool? And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression? If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side. I benchmarked that just in case; it doesn't introduce any overhead and is a future-proof solution. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds good to me. or we can just use regular spawn like before and see if #599 is worth taking a look at separately. maybe it's too much of a micro-optimisation that doesn't make any difference There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sounds good to me too. I do think the PVF host should be non-blocking as it does no blocking work itself. It waits on the child processes but that shouldn't block. |
||
handle_candidate_validation_message( | ||
message_and_sender.1, | ||
validation_host.clone(), | ||
metrics.clone(), | ||
message_and_sender.0, | ||
) | ||
}) | ||
.await; | ||
|
||
res | ||
} | ||
|
||
async fn handle_candidate_validation_message<Sender>( | ||
mut sender: Sender, | ||
validation_host: ValidationHost, | ||
metrics: Metrics, | ||
msg: CandidateValidationMessage, | ||
) where | ||
Sender: SubsystemSender<RuntimeApiMessage>, | ||
{ | ||
match msg { | ||
CandidateValidationMessage::ValidateFromChainState( | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
response_sender, | ||
) => { | ||
let _timer = metrics.time_validate_from_chain_state(); | ||
let res = validate_from_chain_state( | ||
&mut sender, | ||
validation_host, | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
&metrics, | ||
) | ||
.await; | ||
|
||
metrics.on_validation_event(&res); | ||
let _ = response_sender.send(res); | ||
}, | ||
CandidateValidationMessage::ValidateFromExhaustive( | ||
persisted_validation_data, | ||
validation_code, | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
response_sender, | ||
) => { | ||
let _timer = metrics.time_validate_from_exhaustive(); | ||
let res = validate_candidate_exhaustive( | ||
validation_host, | ||
persisted_validation_data, | ||
validation_code, | ||
candidate_receipt, | ||
pov, | ||
executor_params, | ||
timeout, | ||
&metrics, | ||
) | ||
.await; | ||
|
||
metrics.on_validation_event(&res); | ||
let _ = response_sender.send(res); | ||
}, | ||
CandidateValidationMessage::PreCheck( | ||
relay_parent, | ||
validation_code_hash, | ||
response_sender, | ||
) => { | ||
let precheck_result = | ||
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) | ||
.await; | ||
|
||
let _ = response_sender.send(precheck_result); | ||
}, | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possible way for implementing this would be to use a async_semaphore.
s.acquire() before all the spawn and you move the resulting guard in the future.