Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Retry failed PVF execution (AmbiguousWorkerDeath) #6235

Merged
merged 6 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/candidate-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1.57"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }

sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
64 changes: 44 additions & 20 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ mod tests;

const LOG_TARGET: &'static str = "parachain::candidate-validation";

/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error.
#[cfg(not(test))]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(1);
slumber marked this conversation as resolved.
Show resolved Hide resolved
eskimor marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(test)]
const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);

/// Configuration for the candidate validation subsystem
#[derive(Clone)]
pub struct Config {
Expand Down Expand Up @@ -490,7 +496,7 @@ where
}

async fn validate_candidate_exhaustive(
mut validation_backend: impl ValidationBackend,
mut validation_backend: impl ValidationBackend + Send,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suddenly needed this due to the new default implementation of validate_candidate_with_retry on the trait. I feel like this bound shouldn't be there, but looks like it gets introduced by the #[async_trait] proc macro.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it's helpful here, but there's a #[async_trait(?Send)] variant for that attribute.

persisted_validation_data: PersistedValidationData,
validation_code: ValidationCode,
candidate_receipt: CandidateReceipt,
Expand Down Expand Up @@ -551,7 +557,7 @@ async fn validate_candidate_exhaustive(
};

let result = validation_backend
.validate_candidate(raw_validation_code.to_vec(), timeout, params)
.validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params)
.await;

if let Err(ref error) = result {
Expand Down Expand Up @@ -604,45 +610,63 @@ async fn validate_candidate_exhaustive(
#[async_trait]
trait ValidationBackend {
async fn validate_candidate(
&mut self,
pvf: Pvf,
timeout: Duration,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError>;

async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
timeout: Duration,
params: ValidationParams,
) -> Result<WasmValidationResult, ValidationError>;
) -> Result<WasmValidationResult, ValidationError> {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = Pvf::from_code(raw_validation_code);

let validation_result =
self.validate_candidate(pvf.clone(), timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient.
if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) =
validation_result
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;
// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
self.validate_candidate(pvf, timeout, params.encode()).await
} else {
validation_result
}
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>;
}

#[async_trait]
impl ValidationBackend for ValidationHost {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
raw_validation_code: Vec<u8>,
pvf: Pvf,
timeout: Duration,
params: ValidationParams,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
let priority = polkadot_node_core_pvf::Priority::Normal;

let (tx, rx) = oneshot::channel();
if let Err(err) = self
.execute_pvf(
Pvf::from_code(raw_validation_code),
timeout,
params.encode(),
polkadot_node_core_pvf::Priority::Normal,
tx,
)
.await
{
if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await {
return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}",
err
)))
}

let validation_result = rx
.await
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?;

validation_result
rx.await
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> {
Expand Down
144 changes: 136 additions & 8 deletions node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,24 +345,36 @@ fn check_does_not_match() {
}

struct MockValidateCandidateBackend {
result: Result<WasmValidationResult, ValidationError>,
result_list: Vec<Result<WasmValidationResult, ValidationError>>,
num_times_called: usize,
}

impl MockValidateCandidateBackend {
fn with_hardcoded_result(result: Result<WasmValidationResult, ValidationError>) -> Self {
Self { result }
Self { result_list: vec![result], num_times_called: 0 }
}

fn with_hardcoded_result_list(
result_list: Vec<Result<WasmValidationResult, ValidationError>>,
) -> Self {
Self { result_list, num_times_called: 0 }
}
}

#[async_trait]
impl ValidationBackend for MockValidateCandidateBackend {
async fn validate_candidate(
&mut self,
_raw_validation_code: Vec<u8>,
_pvf: Pvf,
_timeout: Duration,
_params: ValidationParams,
_encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
self.result.clone()
// This is expected to panic if called more times than expected, indicating an error in the
// test.
let result = self.result_list[self.num_times_called].clone();
self.num_times_called += 1;

result
}

async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> {
Expand Down Expand Up @@ -468,7 +480,7 @@ fn candidate_validation_bad_return_is_invalid() {

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result(Err(
ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath),
ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
)),
validation_data,
validation_code,
Expand All @@ -479,6 +491,122 @@ fn candidate_validation_bad_return_is_invalid() {
))
.unwrap();

assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout));
}

#[test]
fn candidate_validation_one_ambiguous_error_is_valid() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };

let pov = PoV { block_data: BlockData(vec![1; 32]) };
let head_data = HeadData(vec![1, 1, 1]);
let validation_code = ValidationCode(vec![2; 16]);

let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
head_data.hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);

let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());

let validation_result = WasmValidationResult {
head_data,
new_validation_code: Some(vec![2, 2, 2].into()),
upward_messages: Vec::new(),
horizontal_messages: Vec::new(),
processed_downward_messages: 0,
hrmp_watermark: 0,
};

let commitments = CandidateCommitments {
head_data: validation_result.head_data.clone(),
upward_messages: validation_result.upward_messages.clone(),
horizontal_messages: validation_result.horizontal_messages.clone(),
new_validation_code: validation_result.new_validation_code.clone(),
processed_downward_messages: validation_result.processed_downward_messages,
hrmp_watermark: validation_result.hrmp_watermark,
};

let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Ok(validation_result),
]),
validation_data.clone(),
validation_code,
candidate_receipt,
Arc::new(pov),
Duration::from_secs(0),
&Default::default(),
))
.unwrap();

assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1]));
assert_eq!(outputs.upward_messages, Vec::<UpwardMessage>::new());
assert_eq!(outputs.horizontal_messages, Vec::new());
assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into()));
assert_eq!(outputs.hrmp_watermark, 0);
assert_eq!(used_validation_data, validation_data);
});
}

#[test]
fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() };

let pov = PoV { block_data: BlockData(vec![1; 32]) };
let validation_code = ValidationCode(vec![2; 16]);

let descriptor = make_valid_candidate_descriptor(
ParaId::from(1_u32),
dummy_hash(),
validation_data.hash(),
pov.hash(),
validation_code.hash(),
dummy_hash(),
dummy_hash(),
Sr25519Keyring::Alice,
);

let check = perform_basic_checks(
&descriptor,
validation_data.max_pov_size,
&pov,
&validation_code.hash(),
);
assert!(check.is_ok());

let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };

let v = executor::block_on(validate_candidate_exhaustive(
MockValidateCandidateBackend::with_hardcoded_result_list(vec![
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
]),
validation_data,
validation_code,
candidate_receipt,
Arc::new(pov),
Duration::from_secs(0),
&Default::default(),
))
.unwrap();

assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_)));
}

Expand Down Expand Up @@ -779,9 +907,9 @@ impl MockPreCheckBackend {
impl ValidationBackend for MockPreCheckBackend {
async fn validate_candidate(
&mut self,
_raw_validation_code: Vec<u8>,
_pvf: Pvf,
_timeout: Duration,
_params: ValidationParams,
_encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError> {
unreachable!()
}
Expand Down
6 changes: 3 additions & 3 deletions node/core/pvf/src/execute/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ fn handle_job_finish(
"execute worker concluded",
);

// First we send the result. It may fail due the other end of the channel being dropped, that's
// legitimate and we don't treat that as an error.
// First we send the result. It may fail due to the other end of the channel being dropped,
// that's legitimate and we don't treat that as an error.
let _ = result_tx.send(result);

// Then, we should deal with the worker:
Expand Down Expand Up @@ -305,7 +305,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);

// Assume that the failure intermittent and retry after a delay.
// Assume that the failure is intermittent and retry after a delay.
Delay::new(Duration::from_secs(3)).await;
},
}
Expand Down