Skip to content

Commit

Permalink
DeltaApplicationError & latest predicted value fixes, fallbacks to se…
Browse files Browse the repository at this point in the history
…quential
  • Loading branch information
gelash committed Aug 26, 2024
1 parent f2853d3 commit 4a77084
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 66 deletions.
2 changes: 1 addition & 1 deletion aptos-move/aptos-aggregator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<T: std::fmt::Debug> PanicOr<T> {

pub fn code_invariant_error<M: std::fmt::Debug>(message: M) -> PanicError {
let msg = format!(
"Delayed materialization code invariant broken (there is a bug in the code), {:?}",
"Code invariant broken (there is a bug in the code), {:?}",
message
);
error!("{}", msg);
Expand Down
97 changes: 70 additions & 27 deletions aptos-move/block-executor/src/captured_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,13 @@ pub(crate) struct CapturedReads<T: Transaction> {

delayed_field_reads: HashMap<T::Identifier, DelayedFieldRead>,

/// If there is a speculative failure (e.g. delta application failure, or an
/// observed inconsistency), the transaction output is irrelevant (must be
/// discarded and transaction re-executed). We have a global flag, as which
/// read observed the inconsistency is irrelevant (moreover, typically,
/// an error is returned to the VM to wrap up the ongoing execution).
speculative_failure: bool,
/// If there is a speculative failure (e.g. delta application failure, or an observed
/// inconsistency), the transaction output is irrelevant (must be discarded and transaction
/// re-executed). We have two global flags, one for speculative failures regarding
/// delayed fields, and the second for all other speculative failures, because these
/// require different validation behavior (delayed fields are validated commit-time).
delayed_field_speculative_failure: bool,
non_delayed_field_speculative_failure: bool,
/// Set if the invarint on CapturedReads intended use is violated. Leads to an alert
/// and sequential execution fallback.
incorrect_use: bool,
Expand Down Expand Up @@ -444,7 +445,7 @@ impl<T: Transaction> CapturedReads<T> {
},
UpdateResult::Inconsistency(m) => {
// Record speculative failure.
self.speculative_failure = true;
self.non_delayed_field_speculative_failure = true;
bail!(m);
},
UpdateResult::Updated | UpdateResult::Inserted => Ok(()),
Expand Down Expand Up @@ -521,7 +522,7 @@ impl<T: Transaction> CapturedReads<T> {
},
UpdateResult::Inconsistency(_) => {
// Record speculative failure.
self.speculative_failure = true;
self.delayed_field_speculative_failure = true;
Err(PanicOr::Or(DelayedFieldsSpeculativeError::InconsistentRead))
},
UpdateResult::Updated | UpdateResult::Inserted => Ok(()),
Expand All @@ -531,7 +532,7 @@ impl<T: Transaction> CapturedReads<T> {
pub(crate) fn capture_delayed_field_read_error<E: std::fmt::Debug>(&mut self, e: &PanicOr<E>) {
match e {
PanicOr::CodeInvariantError(_) => self.incorrect_use = true,
PanicOr::Or(_) => self.speculative_failure = true,
PanicOr::Or(_) => self.delayed_field_speculative_failure = true,
};
}

Expand All @@ -554,7 +555,7 @@ impl<T: Transaction> CapturedReads<T> {
data_map: &VersionedData<T::Key, T::Value>,
idx_to_validate: TxnIndex,
) -> bool {
if self.speculative_failure {
if self.non_delayed_field_speculative_failure {
return false;
}

Expand Down Expand Up @@ -590,7 +591,7 @@ impl<T: Transaction> CapturedReads<T> {
) -> bool {
use MVGroupError::*;

if self.speculative_failure {
if self.non_delayed_field_speculative_failure {
return false;
}

Expand Down Expand Up @@ -631,19 +632,19 @@ impl<T: Transaction> CapturedReads<T> {
}

// This validation needs to be called at commit time
// (as it internally uses read_latest_committed_value to get the current value).
// (as it internally uses read_latest_predicted_value to get the current value).
pub(crate) fn validate_delayed_field_reads(
&self,
delayed_fields: &dyn TVersionedDelayedFieldView<T::Identifier>,
idx_to_validate: TxnIndex,
) -> Result<bool, PanicError> {
if self.speculative_failure {
if self.delayed_field_speculative_failure {
return Ok(false);
}

use MVDelayedFieldsError::*;
for (id, read_value) in &self.delayed_field_reads {
match delayed_fields.read_latest_committed_value(
match delayed_fields.read_latest_predicted_value(
id,
idx_to_validate,
ReadPosition::BeforeCurrentTxn,
Expand Down Expand Up @@ -707,8 +708,12 @@ impl<T: Transaction> CapturedReads<T> {
ret
}

pub(crate) fn mark_failure(&mut self) {
self.speculative_failure = true;
pub(crate) fn mark_failure(&mut self, delayed_field_failure: bool) {
if delayed_field_failure {
self.delayed_field_speculative_failure = true;
} else {
self.non_delayed_field_speculative_failure = true;
}
}

pub(crate) fn mark_incorrect_use(&mut self) {
Expand Down Expand Up @@ -756,8 +761,10 @@ impl<T: Transaction> UnsyncReadSet<T> {
mod test {
use super::*;
use crate::proptest_types::types::{raw_metadata, KeyType, MockEvent, ValueType};
use aptos_mvhashmap::types::StorageVersion;
use claims::{assert_err, assert_gt, assert_matches, assert_none, assert_ok, assert_some_eq};
use aptos_mvhashmap::{types::StorageVersion, MVHashMap};
use claims::{
assert_err, assert_gt, assert_matches, assert_none, assert_ok, assert_ok_eq, assert_some_eq,
};
use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID;
use test_case::test_case;

Expand Down Expand Up @@ -1268,30 +1275,66 @@ mod test {
let deletion_metadata = DataRead::Metadata(None);
let exists = DataRead::Exists(true);

assert!(!captured_reads.speculative_failure);
assert!(!captured_reads.non_delayed_field_speculative_failure);
assert!(!captured_reads.delayed_field_speculative_failure);
let key = KeyType::<u32>(20, false);
assert_ok!(captured_reads.capture_read(key, use_tag.then_some(30), exists));
assert_err!(captured_reads.capture_read(
key,
use_tag.then_some(30),
deletion_metadata.clone()
));
assert!(captured_reads.speculative_failure);
assert!(captured_reads.non_delayed_field_speculative_failure);
assert!(!captured_reads.delayed_field_speculative_failure);

captured_reads.speculative_failure = false;
let mvhashmap = MVHashMap::new();
// MVHashMap<KeyType<u32>, u32, ValueType, MockExecutable, DelayedFieldID> =

captured_reads.non_delayed_field_speculative_failure = false;
captured_reads.delayed_field_speculative_failure = false;
let key = KeyType::<u32>(21, false);
assert_ok!(captured_reads.capture_read(key, use_tag.then_some(30), deletion_metadata));
assert_err!(captured_reads.capture_read(key, use_tag.then_some(30), resolved));
assert!(captured_reads.speculative_failure);
assert!(captured_reads.non_delayed_field_speculative_failure);
assert!(!captured_reads.validate_data_reads(mvhashmap.data(), 0));
assert!(!captured_reads.validate_group_reads(mvhashmap.group_data(), 0));
assert!(!captured_reads.delayed_field_speculative_failure);
assert_ok_eq!(
captured_reads.validate_delayed_field_reads(mvhashmap.delayed_fields(), 0),
true
);

captured_reads.speculative_failure = false;
captured_reads.non_delayed_field_speculative_failure = false;
captured_reads.delayed_field_speculative_failure = false;
let key = KeyType::<u32>(22, false);
assert_ok!(captured_reads.capture_read(key, use_tag.then_some(30), metadata));
assert_err!(captured_reads.capture_read(key, use_tag.then_some(30), versioned_legacy));
assert!(captured_reads.speculative_failure);
assert!(captured_reads.non_delayed_field_speculative_failure);
assert!(!captured_reads.delayed_field_speculative_failure);

let mut captured_reads = CapturedReads::<TestTransactionType>::new();
captured_reads.non_delayed_field_speculative_failure = false;
captured_reads.delayed_field_speculative_failure = false;
captured_reads.mark_failure(true);
assert!(!captured_reads.non_delayed_field_speculative_failure);
assert!(captured_reads.validate_data_reads(mvhashmap.data(), 0));
assert!(captured_reads.validate_group_reads(mvhashmap.group_data(), 0));
assert!(captured_reads.delayed_field_speculative_failure);
assert_ok_eq!(
captured_reads.validate_delayed_field_reads(mvhashmap.delayed_fields(), 0),
false
);

captured_reads.speculative_failure = false;
captured_reads.mark_failure();
assert!(captured_reads.speculative_failure);
captured_reads.mark_failure(true);
assert!(!captured_reads.non_delayed_field_speculative_failure);
assert!(captured_reads.delayed_field_speculative_failure);

captured_reads.delayed_field_speculative_failure = false;
captured_reads.mark_failure(false);
assert!(captured_reads.non_delayed_field_speculative_failure);
assert!(!captured_reads.delayed_field_speculative_failure);
captured_reads.mark_failure(true);
assert!(captured_reads.non_delayed_field_speculative_failure);
assert!(captured_reads.delayed_field_speculative_failure);
}
}
27 changes: 25 additions & 2 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ where
&& block_limit_processor.should_end_block_parallel()
{
// Set the execution output status to be SkipRest, to skip the rest of the txns.
last_input_output.update_to_skip_rest(txn_idx);
last_input_output.update_to_skip_rest(txn_idx)?;
}
}

Expand Down Expand Up @@ -854,7 +854,7 @@ where
}

let mut final_results = final_results.acquire();
match last_input_output.take_output(txn_idx) {
match last_input_output.take_output(txn_idx)? {
ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => {
final_results[txn_idx as usize] = t;
},
Expand Down Expand Up @@ -884,6 +884,7 @@ where
params: &ParallelExecutionParams,
) -> Result<(), PanicOr<ParallelBlockExecutionError>> {
// Make executor for each task. TODO: fast concurrent executor.
let num_txns = block.len();
let init_timer = VM_INIT_SECONDS.start_timer();
let vm_init_view: VMInitView<'_, T, S> = VMInitView::new(base_view, versioned_cache);
let executor = E::init(env.clone(), &vm_init_view);
Expand All @@ -910,6 +911,19 @@ where
};

loop {
if let SchedulerTask::ValidationTask(_, incarnation, _) = &scheduler_task {
if *incarnation as usize > num_txns + 10 {
// Something is wrong if we observe high incarnations (e.g. a bug
// might manifest as an execution-invalidation cycle). Break out with
// an error to fallback to sequential execution.
return Err(code_invariant_error(format!(
"BlockSTM: too high incarnation {}, block len {}",
*incarnation, num_txns,
))
.into());
}
}

if params.worker_should_commit(worker_id) {
while scheduler.should_coordinate_commits() {
self.prepare_and_queue_commit_ready_txns(
Expand Down Expand Up @@ -1083,6 +1097,15 @@ where
});
drop(timer);

if !shared_maybe_error.load(Ordering::SeqCst) && scheduler.pop_from_commit_queue().is_ok() {
// No error is recorded, parallel execution workers are done, but there is
// still a commit task remaining. Commit tasks must be drained before workers
// exit, hence we log an error and fallback to sequential execution.
alert!("[BlockSTM] error: commit tasks not drained after parallel execution");

shared_maybe_error.store(true, Ordering::Relaxed);
}

counters::update_state_counters(versioned_cache.stats(), true);

// Explicit async drops.
Expand Down
25 changes: 17 additions & 8 deletions aptos-move/block-executor/src/txn_last_input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,21 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>
}
}

pub(crate) fn update_to_skip_rest(&self, txn_idx: TxnIndex) {
pub(crate) fn update_to_skip_rest(&self, txn_idx: TxnIndex) -> Result<(), PanicError> {
if self.block_skips_rest_at_idx(txn_idx) {
// Already skipping.
return;
return Ok(());
}

// check_execution_status_during_commit must be used for checks re:status.
// Hence, since the status is not SkipRest, it must be Success.
if let ExecutionStatus::Success(output) = self.take_output(txn_idx) {
if let ExecutionStatus::Success(output) = self.take_output(txn_idx)? {
self.outputs[txn_idx as usize].store(Some(Arc::new(ExecutionStatus::SkipRest(output))));
Ok(())
} else {
unreachable!("Unexpected status, must be Success");
Err(code_invariant_error(
"Unexpected status to change to SkipRest, must be Success",
))
}
}

Expand Down Expand Up @@ -446,12 +449,18 @@ impl<T: Transaction, O: TransactionOutput<Txn = T>, E: Debug + Send + Clone>

// Must be executed after parallel execution is done, grabs outputs. Will panic if
// other outstanding references to the recorded outputs exist.
pub(crate) fn take_output(&self, txn_idx: TxnIndex) -> ExecutionStatus<O, E> {
pub(crate) fn take_output(
&self,
txn_idx: TxnIndex,
) -> Result<ExecutionStatus<O, E>, PanicError> {
let owning_ptr = self.outputs[txn_idx as usize]
.swap(None)
.expect("[BlockSTM]: Output must be recorded after execution");
.ok_or(code_invariant_error(
"[BlockSTM]: Output must be recorded after execution",
))?;

Arc::try_unwrap(owning_ptr)
.expect("[BlockSTM]: Output should be uniquely owned after execution")
Arc::try_unwrap(owning_ptr).map_err(|_| {
code_invariant_error("[BlockSTM]: Output must be recorded after execution")
})
}
}
2 changes: 1 addition & 1 deletion aptos-move/block-executor/src/value_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>> ValueToIdentifierMapping
ViewState::Sync(state) => state
.versioned_map
.delayed_fields()
.read_latest_committed_value(
.read_latest_predicted_value(
&identifier,
self.txn_idx,
ReadPosition::AfterCurrentTxn,
Expand Down
12 changes: 6 additions & 6 deletions aptos-move/block-executor/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ fn delayed_field_try_add_delta_outcome_impl<T: Transaction>(
.into());
}

let last_committed_value = loop {
match versioned_delayed_fields.read_latest_committed_value(
let predicted_value = loop {
match versioned_delayed_fields.read_latest_predicted_value(
id,
txn_idx,
ReadPosition::BeforeCurrentTxn,
Expand All @@ -390,7 +390,7 @@ fn delayed_field_try_add_delta_outcome_impl<T: Transaction>(
compute_delayed_field_try_add_delta_outcome_first_time(
delta,
max_value,
last_committed_value,
predicted_value,
)?;

captured_reads
Expand Down Expand Up @@ -660,7 +660,7 @@ impl<'a, T: Transaction> ResourceState<T> for ParallelState<'a, T> {
));
},
Ok(false) => {
self.captured_reads.borrow_mut().mark_failure();
self.captured_reads.borrow_mut().mark_failure(false);
return ReadResult::HaltSpeculativeExecution(
"Interrupted as block execution was halted".to_string(),
);
Expand All @@ -672,7 +672,7 @@ impl<'a, T: Transaction> ResourceState<T> for ParallelState<'a, T> {
},
Err(DeltaApplicationFailure) => {
// AggregatorV1 may have delta application failure due to speculation.
self.captured_reads.borrow_mut().mark_failure();
self.captured_reads.borrow_mut().mark_failure(false);
return ReadResult::HaltSpeculativeExecution(
"Delta application failure (must be speculative)".to_string(),
);
Expand Down Expand Up @@ -2067,7 +2067,7 @@ mod test {
.ok_or(PanicOr::Or(MVDelayedFieldsError::NotFound))
}

fn read_latest_committed_value(
fn read_latest_predicted_value(
&self,
id: &DelayedFieldID,
_current_txn_idx: TxnIndex,
Expand Down
Loading

0 comments on commit 4a77084

Please sign in to comment.