Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Use CPU clock timeout for PVF jobs (#6282)
Browse files Browse the repository at this point in the history
* Put in skeleton logic for CPU-time-preparation

Still needed:
- Flesh out logic
- Refactor some spots
- Tests

* Continue filling in logic for prepare worker CPU time changes

* Fix compiler errors

* Update lenience factor

* Fix some clippy lints for PVF module

* Fix compilation errors

* Address some review comments

* Add logging

* Add another log

* Address some review comments; change Mutex to AtomicBool

* Refactor handling response bytes

* Add CPU clock timeout logic for execute jobs

* Properly handle AtomicBool flag

* Use `Ordering::Relaxed`

* Refactor thread coordination logic

* Fix bug

* Add some timing information to execute tests

* Add section about the mitigation to the IG

* minor: Change more `Ordering`s to `Relaxed`

* candidate-validation: Fix build errors
  • Loading branch information
mrcnski authored Nov 30, 2022
1 parent 90effe7 commit ce003d6
Show file tree
Hide file tree
Showing 17 changed files with 536 additions and 170 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ trait ValidationBackend {
}
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>;
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>;
}

#[async_trait]
Expand All @@ -664,7 +664,7 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(_) = self.precheck_pvf(pvf, tx).await {
return Err(PrepareError::DidNotMakeIt)
Expand Down
12 changes: 6 additions & 6 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
unreachable!()
}
}
Expand Down Expand Up @@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() {
}

struct MockPreCheckBackend {
result: Result<(), PrepareError>,
result: Result<Duration, PrepareError>,
}

impl MockPreCheckBackend {
fn with_hardcoded_result(result: Result<(), PrepareError>) -> Self {
fn with_hardcoded_result(result: Result<Duration, PrepareError>) -> Self {
Self { result }
}
}
Expand All @@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend {
unreachable!()
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<Duration, PrepareError> {
self.result.clone()
}
}
Expand All @@ -931,7 +931,7 @@ fn precheck_works() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(())),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
relay_parent,
validation_code_hash,
)
Expand Down Expand Up @@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(())),
MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())),
relay_parent,
validation_code_hash,
)
Expand Down
4 changes: 4 additions & 0 deletions node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ always-assert = "0.1"
async-std = { version = "1.11.0", features = ["attributes"] }
async-process = "1.3.0"
assert_matches = "1.4.0"
cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
slotmap = "1.0"
Expand All @@ -21,10 +22,13 @@ pin-project = "1.0.9"
rand = "0.8.5"
tempfile = "3.3.0"
rayon = "1.5.1"

parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] }

polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
polkadot-node-metrics = { path = "../../metrics"}

sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
11 changes: 9 additions & 2 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
/// The CPU time that was taken preparing this artifact.
cpu_time_elapsed: Duration,
},
/// A task to prepare this artifact is scheduled.
Preparing {
Expand Down Expand Up @@ -171,11 +173,16 @@ impl Artifacts {
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
#[cfg(test)]
pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) {
pub fn insert_prepared(
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
cpu_time_elapsed: Duration,
) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed })
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed })
.is_none());
}

Expand Down
7 changes: 4 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use parity_scale_codec::{Decode, Encode};
use std::any::Any;
use std::{any::Any, time::Duration};

/// Result of PVF preparation performed by the validation host.
pub type PrepareResult = Result<(), PrepareError>;
/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// successful
pub type PrepareResult = Result<Duration, PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ mod queue;
mod worker;

pub use queue::{start, ToQueue};
pub use worker::worker_entrypoint;
pub use worker::{worker_entrypoint, Response as ExecuteResponse};
3 changes: 2 additions & 1 deletion node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ fn handle_job_finish(
result_tx: ResultSender,
) {
let (idle_worker, result) = match outcome {
Outcome::Ok { result_descriptor, duration_ms: _, idle_worker } => {
Outcome::Ok { result_descriptor, duration: _, idle_worker } => {
// TODO: propagate the soft timeout

(Some(idle_worker), Ok(result_descriptor))
},
Outcome::InvalidCandidate { err, idle_worker } => (
Expand Down
Loading

0 comments on commit ce003d6

Please sign in to comment.