diff --git a/aptos-move/aptos-aggregator/src/types.rs b/aptos-move/aptos-aggregator/src/types.rs index 81b99d6a2445b..32e1ffca5bf5c 100644 --- a/aptos-move/aptos-aggregator/src/types.rs +++ b/aptos-move/aptos-aggregator/src/types.rs @@ -41,7 +41,7 @@ impl PanicOr { pub fn code_invariant_error(message: M) -> PanicError { let msg = format!( - "Delayed materialization code invariant broken (there is a bug in the code), {:?}", + "Code invariant broken (there is a bug in the code), {:?}", message ); error!("{}", msg); diff --git a/aptos-move/block-executor/src/captured_reads.rs b/aptos-move/block-executor/src/captured_reads.rs index aaec491043d24..226d5e13c6c85 100644 --- a/aptos-move/block-executor/src/captured_reads.rs +++ b/aptos-move/block-executor/src/captured_reads.rs @@ -304,12 +304,13 @@ pub(crate) struct CapturedReads { delayed_field_reads: HashMap, - /// If there is a speculative failure (e.g. delta application failure, or an - /// observed inconsistency), the transaction output is irrelevant (must be - /// discarded and transaction re-executed). We have a global flag, as which - /// read observed the inconsistency is irrelevant (moreover, typically, - /// an error is returned to the VM to wrap up the ongoing execution). - speculative_failure: bool, + /// If there is a speculative failure (e.g. delta application failure, or an observed + /// inconsistency), the transaction output is irrelevant (must be discarded and transaction + /// re-executed). We have two global flags, one for speculative failures regarding + /// delayed fields, and the second for all other speculative failures, because these + /// require different validation behavior (delayed fields are validated commit-time). + delayed_field_speculative_failure: bool, + non_delayed_field_speculative_failure: bool, /// Set if the invarint on CapturedReads intended use is violated. Leads to an alert /// and sequential execution fallback. incorrect_use: bool, @@ -444,7 +445,7 @@ impl CapturedReads { }, UpdateResult::Inconsistency(m) => { // Record speculative failure. - self.speculative_failure = true; + self.non_delayed_field_speculative_failure = true; bail!(m); }, UpdateResult::Updated | UpdateResult::Inserted => Ok(()), @@ -521,7 +522,7 @@ impl CapturedReads { }, UpdateResult::Inconsistency(_) => { // Record speculative failure. - self.speculative_failure = true; + self.delayed_field_speculative_failure = true; Err(PanicOr::Or(DelayedFieldsSpeculativeError::InconsistentRead)) }, UpdateResult::Updated | UpdateResult::Inserted => Ok(()), @@ -531,7 +532,7 @@ impl CapturedReads { pub(crate) fn capture_delayed_field_read_error(&mut self, e: &PanicOr) { match e { PanicOr::CodeInvariantError(_) => self.incorrect_use = true, - PanicOr::Or(_) => self.speculative_failure = true, + PanicOr::Or(_) => self.delayed_field_speculative_failure = true, }; } @@ -554,7 +555,7 @@ impl CapturedReads { data_map: &VersionedData, idx_to_validate: TxnIndex, ) -> bool { - if self.speculative_failure { + if self.non_delayed_field_speculative_failure { return false; } @@ -590,7 +591,7 @@ impl CapturedReads { ) -> bool { use MVGroupError::*; - if self.speculative_failure { + if self.non_delayed_field_speculative_failure { return false; } @@ -631,19 +632,19 @@ impl CapturedReads { } // This validation needs to be called at commit time - // (as it internally uses read_latest_committed_value to get the current value). + // (as it internally uses read_latest_predicted_value to get the current value). pub(crate) fn validate_delayed_field_reads( &self, delayed_fields: &dyn TVersionedDelayedFieldView, idx_to_validate: TxnIndex, ) -> Result { - if self.speculative_failure { + if self.delayed_field_speculative_failure { return Ok(false); } use MVDelayedFieldsError::*; for (id, read_value) in &self.delayed_field_reads { - match delayed_fields.read_latest_committed_value( + match delayed_fields.read_latest_predicted_value( id, idx_to_validate, ReadPosition::BeforeCurrentTxn, @@ -707,8 +708,12 @@ impl CapturedReads { ret } - pub(crate) fn mark_failure(&mut self) { - self.speculative_failure = true; + pub(crate) fn mark_failure(&mut self, delayed_field_failure: bool) { + if delayed_field_failure { + self.delayed_field_speculative_failure = true; + } else { + self.non_delayed_field_speculative_failure = true; + } } pub(crate) fn mark_incorrect_use(&mut self) { @@ -756,8 +761,10 @@ impl UnsyncReadSet { mod test { use super::*; use crate::proptest_types::types::{raw_metadata, KeyType, MockEvent, ValueType}; - use aptos_mvhashmap::types::StorageVersion; - use claims::{assert_err, assert_gt, assert_matches, assert_none, assert_ok, assert_some_eq}; + use aptos_mvhashmap::{types::StorageVersion, MVHashMap}; + use claims::{ + assert_err, assert_gt, assert_matches, assert_none, assert_ok, assert_ok_eq, assert_some_eq, + }; use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID; use test_case::test_case; @@ -1268,7 +1275,8 @@ mod test { let deletion_metadata = DataRead::Metadata(None); let exists = DataRead::Exists(true); - assert!(!captured_reads.speculative_failure); + assert!(!captured_reads.non_delayed_field_speculative_failure); + assert!(!captured_reads.delayed_field_speculative_failure); let key = KeyType::(20, false); assert_ok!(captured_reads.capture_read(key, use_tag.then_some(30), exists)); assert_err!(captured_reads.capture_read( @@ -1276,22 +1284,57 @@ mod test { use_tag.then_some(30), deletion_metadata.clone() )); - assert!(captured_reads.speculative_failure); + assert!(captured_reads.non_delayed_field_speculative_failure); + assert!(!captured_reads.delayed_field_speculative_failure); - captured_reads.speculative_failure = false; + let mvhashmap = MVHashMap::new(); + // MVHashMap, u32, ValueType, MockExecutable, DelayedFieldID> = + + captured_reads.non_delayed_field_speculative_failure = false; + captured_reads.delayed_field_speculative_failure = false; let key = KeyType::(21, false); assert_ok!(captured_reads.capture_read(key, use_tag.then_some(30), deletion_metadata)); assert_err!(captured_reads.capture_read(key, use_tag.then_some(30), resolved)); - assert!(captured_reads.speculative_failure); + assert!(captured_reads.non_delayed_field_speculative_failure); + assert!(!captured_reads.validate_data_reads(mvhashmap.data(), 0)); + assert!(!captured_reads.validate_group_reads(mvhashmap.group_data(), 0)); + assert!(!captured_reads.delayed_field_speculative_failure); + assert_ok_eq!( + captured_reads.validate_delayed_field_reads(mvhashmap.delayed_fields(), 0), + true + ); - captured_reads.speculative_failure = false; + captured_reads.non_delayed_field_speculative_failure = false; + captured_reads.delayed_field_speculative_failure = false; let key = KeyType::(22, false); assert_ok!(captured_reads.capture_read(key, use_tag.then_some(30), metadata)); assert_err!(captured_reads.capture_read(key, use_tag.then_some(30), versioned_legacy)); - assert!(captured_reads.speculative_failure); + assert!(captured_reads.non_delayed_field_speculative_failure); + assert!(!captured_reads.delayed_field_speculative_failure); + + let mut captured_reads = CapturedReads::::new(); + captured_reads.non_delayed_field_speculative_failure = false; + captured_reads.delayed_field_speculative_failure = false; + captured_reads.mark_failure(true); + assert!(!captured_reads.non_delayed_field_speculative_failure); + assert!(captured_reads.validate_data_reads(mvhashmap.data(), 0)); + assert!(captured_reads.validate_group_reads(mvhashmap.group_data(), 0)); + assert!(captured_reads.delayed_field_speculative_failure); + assert_ok_eq!( + captured_reads.validate_delayed_field_reads(mvhashmap.delayed_fields(), 0), + false + ); - captured_reads.speculative_failure = false; - captured_reads.mark_failure(); - assert!(captured_reads.speculative_failure); + captured_reads.mark_failure(true); + assert!(!captured_reads.non_delayed_field_speculative_failure); + assert!(captured_reads.delayed_field_speculative_failure); + + captured_reads.delayed_field_speculative_failure = false; + captured_reads.mark_failure(false); + assert!(captured_reads.non_delayed_field_speculative_failure); + assert!(!captured_reads.delayed_field_speculative_failure); + captured_reads.mark_failure(true); + assert!(captured_reads.non_delayed_field_speculative_failure); + assert!(captured_reads.delayed_field_speculative_failure); } } diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index 2eee749a0e9d4..3dd0962917d83 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -657,7 +657,7 @@ where && block_limit_processor.should_end_block_parallel() { // Set the execution output status to be SkipRest, to skip the rest of the txns. - last_input_output.update_to_skip_rest(txn_idx); + last_input_output.update_to_skip_rest(txn_idx)?; } } @@ -854,7 +854,7 @@ where } let mut final_results = final_results.acquire(); - match last_input_output.take_output(txn_idx) { + match last_input_output.take_output(txn_idx)? { ExecutionStatus::Success(t) | ExecutionStatus::SkipRest(t) => { final_results[txn_idx as usize] = t; }, @@ -884,6 +884,7 @@ where params: &ParallelExecutionParams, ) -> Result<(), PanicOr> { // Make executor for each task. TODO: fast concurrent executor. + let num_txns = block.len(); let init_timer = VM_INIT_SECONDS.start_timer(); let vm_init_view: VMInitView<'_, T, S> = VMInitView::new(base_view, versioned_cache); let executor = E::init(env.clone(), &vm_init_view); @@ -910,6 +911,19 @@ where }; loop { + if let SchedulerTask::ValidationTask(_, incarnation, _) = &scheduler_task { + if *incarnation as usize > num_txns + 10 { + // Something is wrong if we observe high incarnations (e.g. a bug + // might manifest as an execution-invalidation cycle). Break out with + // an error to fallback to sequential execution. + return Err(code_invariant_error(format!( + "BlockSTM: too high incarnation {}, block len {}", + *incarnation, num_txns, + )) + .into()); + } + } + if params.worker_should_commit(worker_id) { while scheduler.should_coordinate_commits() { self.prepare_and_queue_commit_ready_txns( @@ -1083,6 +1097,15 @@ where }); drop(timer); + if !shared_maybe_error.load(Ordering::SeqCst) && scheduler.pop_from_commit_queue().is_ok() { + // No error is recorded, parallel execution workers are done, but there is + // still a commit task remaining. Commit tasks must be drained before workers + // exit, hence we log an error and fallback to sequential execution. + alert!("[BlockSTM] error: commit tasks not drained after parallel execution"); + + shared_maybe_error.store(true, Ordering::Relaxed); + } + counters::update_state_counters(versioned_cache.stats(), true); // Explicit async drops. diff --git a/aptos-move/block-executor/src/txn_last_input_output.rs b/aptos-move/block-executor/src/txn_last_input_output.rs index c928a88005bac..9960b2560a421 100644 --- a/aptos-move/block-executor/src/txn_last_input_output.rs +++ b/aptos-move/block-executor/src/txn_last_input_output.rs @@ -250,18 +250,21 @@ impl, E: Debug + Send + Clone> } } - pub(crate) fn update_to_skip_rest(&self, txn_idx: TxnIndex) { + pub(crate) fn update_to_skip_rest(&self, txn_idx: TxnIndex) -> Result<(), PanicError> { if self.block_skips_rest_at_idx(txn_idx) { // Already skipping. - return; + return Ok(()); } // check_execution_status_during_commit must be used for checks re:status. // Hence, since the status is not SkipRest, it must be Success. - if let ExecutionStatus::Success(output) = self.take_output(txn_idx) { + if let ExecutionStatus::Success(output) = self.take_output(txn_idx)? { self.outputs[txn_idx as usize].store(Some(Arc::new(ExecutionStatus::SkipRest(output)))); + Ok(()) } else { - unreachable!("Unexpected status, must be Success"); + Err(code_invariant_error( + "Unexpected status to change to SkipRest, must be Success", + )) } } @@ -446,12 +449,18 @@ impl, E: Debug + Send + Clone> // Must be executed after parallel execution is done, grabs outputs. Will panic if // other outstanding references to the recorded outputs exist. - pub(crate) fn take_output(&self, txn_idx: TxnIndex) -> ExecutionStatus { + pub(crate) fn take_output( + &self, + txn_idx: TxnIndex, + ) -> Result, PanicError> { let owning_ptr = self.outputs[txn_idx as usize] .swap(None) - .expect("[BlockSTM]: Output must be recorded after execution"); + .ok_or(code_invariant_error( + "[BlockSTM]: Output must be recorded after execution", + ))?; - Arc::try_unwrap(owning_ptr) - .expect("[BlockSTM]: Output should be uniquely owned after execution") + Arc::try_unwrap(owning_ptr).map_err(|_| { + code_invariant_error("[BlockSTM]: Output must be recorded after execution") + }) } } diff --git a/aptos-move/block-executor/src/value_exchange.rs b/aptos-move/block-executor/src/value_exchange.rs index 65d839112775f..30f797dc35059 100644 --- a/aptos-move/block-executor/src/value_exchange.rs +++ b/aptos-move/block-executor/src/value_exchange.rs @@ -85,7 +85,7 @@ impl<'a, T: Transaction, S: TStateView> ValueToIdentifierMapping ViewState::Sync(state) => state .versioned_map .delayed_fields() - .read_latest_committed_value( + .read_latest_predicted_value( &identifier, self.txn_idx, ReadPosition::AfterCurrentTxn, diff --git a/aptos-move/block-executor/src/view.rs b/aptos-move/block-executor/src/view.rs index 1dc3d95512260..5dfc355290028 100644 --- a/aptos-move/block-executor/src/view.rs +++ b/aptos-move/block-executor/src/view.rs @@ -364,8 +364,8 @@ fn delayed_field_try_add_delta_outcome_impl( .into()); } - let last_committed_value = loop { - match versioned_delayed_fields.read_latest_committed_value( + let predicted_value = loop { + match versioned_delayed_fields.read_latest_predicted_value( id, txn_idx, ReadPosition::BeforeCurrentTxn, @@ -390,7 +390,7 @@ fn delayed_field_try_add_delta_outcome_impl( compute_delayed_field_try_add_delta_outcome_first_time( delta, max_value, - last_committed_value, + predicted_value, )?; captured_reads @@ -660,7 +660,7 @@ impl<'a, T: Transaction> ResourceState for ParallelState<'a, T> { )); }, Ok(false) => { - self.captured_reads.borrow_mut().mark_failure(); + self.captured_reads.borrow_mut().mark_failure(false); return ReadResult::HaltSpeculativeExecution( "Interrupted as block execution was halted".to_string(), ); @@ -672,7 +672,7 @@ impl<'a, T: Transaction> ResourceState for ParallelState<'a, T> { }, Err(DeltaApplicationFailure) => { // AggregatorV1 may have delta application failure due to speculation. - self.captured_reads.borrow_mut().mark_failure(); + self.captured_reads.borrow_mut().mark_failure(false); return ReadResult::HaltSpeculativeExecution( "Delta application failure (must be speculative)".to_string(), ); @@ -2067,7 +2067,7 @@ mod test { .ok_or(PanicOr::Or(MVDelayedFieldsError::NotFound)) } - fn read_latest_committed_value( + fn read_latest_predicted_value( &self, id: &DelayedFieldID, _current_txn_idx: TxnIndex, diff --git a/aptos-move/mvhashmap/src/versioned_delayed_fields.rs b/aptos-move/mvhashmap/src/versioned_delayed_fields.rs index 5a5de5f44f09f..3341a0ab37d53 100644 --- a/aptos-move/mvhashmap/src/versioned_delayed_fields.rs +++ b/aptos-move/mvhashmap/src/versioned_delayed_fields.rs @@ -201,8 +201,12 @@ impl VersionedValue { } // Given a transaction index which should be committed next, returns the latest value - // below this version, or an error if such a value does not exist. - fn read_latest_committed_value( + // below this version, or if no such value exists, then the delayed field must have been + // created in the same block. In this case predict the value in the first (lowest) entry, + // or an error if such an entry cannot be found (must be due to speculation). The lowest + // entry is picked without regards to the indices, as it's for optimistic prediction + // purposes only (better to have some value than error). + fn read_latest_predicted_value( &self, next_idx_to_commit: TxnIndex, ) -> Result { @@ -212,10 +216,15 @@ impl VersionedValue { .range(0..next_idx_to_commit) .next_back() .map_or_else( - || { - self.base_value - .clone() - .ok_or(MVDelayedFieldsError::NotFound) + || match &self.base_value { + Some(value) => Ok(value.clone()), + None => match self.versioned_map.first_key_value() { + Some((_, entry)) => match entry.as_ref().deref() { + Value(v, _) => Ok(v.clone()), + Apply(_) | Estimate(_) => Err(MVDelayedFieldsError::NotFound), + }, + None => Err(MVDelayedFieldsError::NotFound), + }, }, |(_, entry)| match entry.as_ref().deref() { Value(v, _) => Ok(v.clone()), @@ -347,10 +356,12 @@ pub trait TVersionedDelayedFieldView { txn_idx: TxnIndex, ) -> Result>; - /// Returns the committed value from largest transaction index that is - /// smaller than the given current_txn_idx (read_position defined whether - /// inclusively or exclusively from the current transaction itself). - fn read_latest_committed_value( + /// Returns the committed value from largest transaction index that is smaller than the + /// given current_txn_idx (read_position defined whether inclusively or exclusively from + /// the current transaction itself). If such a value does not exist, the value might + /// be created in the current block, and the value from the first (lowest) entry is taken + /// as the prediction. + fn read_latest_predicted_value( &self, id: &K, current_txn_idx: TxnIndex, @@ -536,7 +547,7 @@ impl VersionedDelayedFields { // remove delta in the commit VersionEntry::Value(v, Some(_)) => Some(v.clone()), VersionEntry::Apply(AggregatorDelta { delta }) => { - let prev_value = versioned_value.read_latest_committed_value(idx_to_commit) + let prev_value = versioned_value.read_latest_predicted_value(idx_to_commit) .map_err(|e| CommitError::CodeInvariantError(format!("Cannot read latest committed value for Apply(AggregatorDelta) during commit: {:?}", e)))?; if let DelayedFieldValue::Aggregator(base) = prev_value { let new_value = delta.apply_to(base).map_err(|e| { @@ -584,7 +595,7 @@ impl VersionedDelayedFields { let prev_value = self.values .get_mut(&base_aggregator) .ok_or_else(|| CommitError::CodeInvariantError("Cannot find base_aggregator for Apply(SnapshotDelta) during commit".to_string()))? - .read_latest_committed_value(idx_to_commit) + .read_latest_predicted_value(idx_to_commit) .map_err(|e| CommitError::CodeInvariantError(format!("Cannot read latest committed value for base aggregator for ApplySnapshotDelta) during commit: {:?}", e)))?; if let DelayedFieldValue::Aggregator(base) = prev_value { @@ -615,7 +626,7 @@ impl VersionedDelayedFields { .get_mut(&base_snapshot) .ok_or_else(|| CommitError::CodeInvariantError("Cannot find base_aggregator for Apply(SnapshotDelta) during commit".to_string()))? // Read values committed in this commit - .read_latest_committed_value(idx_to_commit + 1) + .read_latest_predicted_value(idx_to_commit + 1) .map_err(|e| CommitError::CodeInvariantError(format!("Cannot read latest committed value for base aggregator for ApplySnapshotDelta) during commit: {:?}", e)))?; if let DelayedFieldValue::Snapshot(base) = prev_value { @@ -705,7 +716,7 @@ impl TVersionedDelayedFieldView /// Returns the committed value from largest transaction index that is /// smaller than the given current_txn_idx (read_position defined whether /// inclusively or exclusively from the current transaction itself). - fn read_latest_committed_value( + fn read_latest_predicted_value( &self, id: &K, current_txn_idx: TxnIndex, @@ -715,7 +726,7 @@ impl TVersionedDelayedFieldView .get_mut(id) .ok_or(MVDelayedFieldsError::NotFound) .and_then(|v| { - v.read_latest_committed_value( + v.read_latest_predicted_value( match read_position { ReadPosition::BeforeCurrentTxn => current_txn_idx, ReadPosition::AfterCurrentTxn => current_txn_idx + 1, @@ -1194,7 +1205,50 @@ mod test { if let Some(entry) = aggregator_entry(type_index) { v.insert_speculative_value(10, entry).unwrap(); } - let _ = v.read_latest_committed_value(11); + let _ = v.read_latest_predicted_value(11); + } + + #[test_case(APPLY_AGGREGATOR)] + #[test_case(APPLY_SNAPSHOT)] + #[test_case(APPLY_DERIVED)] + fn read_first_entry_not_value(type_index: usize) { + let mut v = VersionedValue::new(None); + assert_matches!( + v.read_latest_predicted_value(11), + Err(MVDelayedFieldsError::NotFound) + ); + + if let Some(entry) = aggregator_entry(type_index) { + v.insert_speculative_value(12, entry).unwrap(); + } + assert_matches!( + v.read_latest_predicted_value(11), + Err(MVDelayedFieldsError::NotFound) + ); + } + + #[test] + fn read_first_entry_value() { + let mut v = VersionedValue::new(None); + v.insert_speculative_value(13, aggregator_entry(APPLY_AGGREGATOR).unwrap()) + .unwrap(); + v.insert_speculative_value(12, aggregator_entry(VALUE_AGGREGATOR).unwrap()) + .unwrap(); + + assert_matches!( + v.read_latest_predicted_value(11), + Ok(DelayedFieldValue::Aggregator(10)) + ); + + v.insert_speculative_value( + 9, + VersionEntry::Value(DelayedFieldValue::Aggregator(9), None), + ) + .unwrap(); + assert_matches!( + v.read_latest_predicted_value(11), + Ok(DelayedFieldValue::Aggregator(9)) + ); } #[should_panic] @@ -1204,11 +1258,11 @@ mod test { v.insert_speculative_value(3, aggregator_entry(VALUE_AGGREGATOR).unwrap()) .unwrap(); v.mark_estimate(3); - let _ = v.read_latest_committed_value(11); + let _ = v.read_latest_predicted_value(11); } #[test] - fn read_latest_committed_value() { + fn read_latest_predicted_value() { let mut v = VersionedValue::new(Some(DelayedFieldValue::Aggregator(5))); v.insert_speculative_value(2, aggregator_entry(VALUE_AGGREGATOR).unwrap()) .unwrap(); @@ -1219,19 +1273,21 @@ mod test { .unwrap(); assert_ok_eq!( - v.read_latest_committed_value(5), + v.read_latest_predicted_value(5), DelayedFieldValue::Aggregator(15) ); assert_ok_eq!( - v.read_latest_committed_value(4), + v.read_latest_predicted_value(4), DelayedFieldValue::Aggregator(10) ); assert_ok_eq!( - v.read_latest_committed_value(2), + v.read_latest_predicted_value(2), DelayedFieldValue::Aggregator(5) ); } + // TODO: test for latest predicted with the first entry (nothing committed). + #[test] fn read_delta_chain() { let mut v = VersionedValue::new(Some(DelayedFieldValue::Aggregator(5)));