Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[loader-v2] Fixing global cache reads & read-before-write on publish #15285

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 23 additions & 37 deletions aptos-move/block-executor/src/captured_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl DelayedFieldRead {
/// from [SyncCodeCache] it can first check the read-set here.
enum ModuleRead<DC, VC, S> {
/// Read from the cross-block module cache.
GlobalCache,
GlobalCache(Arc<ModuleCode<DC, VC, S>>),
/// Read from per-block cache ([SyncCodeCache]) used by parallel execution.
PerBlockCache(Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>),
Comment on lines 298 to 302
Copy link
Contributor

@igor-aptos igor-aptos Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why do we distinguish reads here based on where we got the data from? also what is Option<TxnIndex> in the PerBlockCache ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option - module does not exist (in StateView even).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different reads - different validations. We need to check that global reads are still valid, and per-block reads have the same version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stupid formatting, didn't show I was referring to TxnIndex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - None is a storage version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it better than Result<TxnIndex, StorageVersion>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why distinction between storage version and global cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different validation paths: for global cache read we need to check if the read is still valid in cache. For per-block we go to MVHashMap. Now, the question is about storage read: we issue it only when there is a cache miss in per-block cache, so it gets validated there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically "storage version" can be later drained into global cache, but otherwise exists only in per-block

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so from validation perspective - there is no distinction

distinction is ONLY there to make updating global cache (i.e. draining to it) be faster/cheaper by skipping things that are already there.

is that correct?

}
Expand Down Expand Up @@ -615,8 +615,8 @@ where
}

/// Records the read to global cache that spans across multiple blocks.
pub(crate) fn capture_global_cache_read(&mut self, key: K) {
self.module_reads.insert(key, ModuleRead::GlobalCache);
pub(crate) fn capture_global_cache_read(&mut self, key: K, read: Arc<ModuleCode<DC, VC, S>>) {
self.module_reads.insert(key, ModuleRead::GlobalCache(read));
}

/// Records the read to per-block level cache.
Expand All @@ -629,22 +629,19 @@ where
.insert(key, ModuleRead::PerBlockCache(read));
}

/// If the module has been previously read from [SyncCodeCache], returns it. Returns a panic
/// error if the read was cached for the global cross-module cache (we do not capture values
/// for those).
/// If the module has been previously read, returns it.
pub(crate) fn get_module_read(
&self,
key: &K,
) -> Result<CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>>, PanicError> {
Ok(match self.module_reads.get(key) {
) -> CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>> {
match self.module_reads.get(key) {
Some(ModuleRead::PerBlockCache(read)) => CacheRead::Hit(read.clone()),
Some(ModuleRead::GlobalCache) => {
return Err(PanicError::CodeInvariantError(
"Global module cache reads do not capture values".to_string(),
));
Some(ModuleRead::GlobalCache(read)) => {
// From global cache, we return a storage version.
CacheRead::Hit(Some((read.clone(), None)))
},
None => CacheRead::Miss,
})
}
}

/// For every module read that was captured, checks if the reads are still the same:
Expand All @@ -661,7 +658,7 @@ where
}

self.module_reads.iter().all(|(key, read)| match read {
ModuleRead::GlobalCache => global_module_cache.contains_valid(key),
ModuleRead::GlobalCache(_) => global_module_cache.contains_valid(key),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this whole match be equivalent to:

        self.module_reads.iter().all(|(key, read)| {
            let previous_version = match read {
              ModuleRead::GlobalCache(_) => None, // i.e. storage version
              ModuleRead::PerBlockCache(previous) => previous.as_ref().map(|(_, version)| *version);
            };
            let current_version = per_block_module_cache.get_module_version(key);
            current_version == previous_version
        })

why do we need to update GlobalCache at all while executing a block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do if we read first from it (to know if entry is overridden or not). An alternative is to check lower level cache first, but this means performance penalty due to locking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code can be somewhat equivalent, but:

let current_version = per_block_module_cache.get_module_version(key);

causes a prefetch of storage version by default. We would need to special case validation to not do it. An we also end up locking the cache (shard, worst case), instead of checking an atomic bool

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because we may publish a module that invalidates the global cache that's being read I think

ModuleRead::PerBlockCache(previous) => {
let current_version = per_block_module_cache.get_module_version(key);
let previous_version = previous.as_ref().map(|(_, version)| *version);
Expand Down Expand Up @@ -1537,20 +1534,6 @@ mod test {
);
}

#[test]
fn test_global_cache_module_reads_are_not_recorded() {
let mut captured_reads = CapturedReads::<
TestTransactionType,
u32,
MockDeserializedCode,
MockVerifiedCode,
MockExtension,
>::new();

captured_reads.capture_global_cache_read(0);
assert!(captured_reads.get_module_read(&0).is_err())
}

#[test]
fn test_global_cache_module_reads() {
let mut captured_reads = CapturedReads::<
Expand All @@ -1563,11 +1546,13 @@ mod test {
let mut global_module_cache = GlobalModuleCache::empty();
let per_block_module_cache = SyncModuleCache::empty();

global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8)));
captured_reads.capture_global_cache_read(0);
let module_0 = mock_verified_code(0, MockExtension::new(8));
global_module_cache.insert(0, module_0.clone());
captured_reads.capture_global_cache_read(0, module_0);

global_module_cache.insert(1, mock_verified_code(1, MockExtension::new(8)));
captured_reads.capture_global_cache_read(1);
let module_1 = mock_verified_code(1, MockExtension::new(8));
global_module_cache.insert(1, module_1.clone());
captured_reads.capture_global_cache_read(1, module_1);

assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache));

Expand Down Expand Up @@ -1613,18 +1598,18 @@ mod test {
captured_reads.capture_per_block_cache_read(0, Some((a, Some(2))));
assert!(matches!(
captured_reads.get_module_read(&0),
Ok(CacheRead::Hit(Some(_)))
CacheRead::Hit(Some(_))
));

captured_reads.capture_per_block_cache_read(1, None);
assert!(matches!(
captured_reads.get_module_read(&1),
Ok(CacheRead::Hit(None))
CacheRead::Hit(None)
));

assert!(matches!(
captured_reads.get_module_read(&2),
Ok(CacheRead::Miss)
CacheRead::Miss
));
}

Expand Down Expand Up @@ -1701,8 +1686,9 @@ mod test {
let per_block_module_cache = SyncModuleCache::empty();

// Module exists in global cache.
global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8)));
captured_reads.capture_global_cache_read(0);
let m = mock_verified_code(0, MockExtension::new(8));
global_module_cache.insert(0, m.clone());
captured_reads.capture_global_cache_read(0, m);
assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache));

// Assume we republish this module: validation must fail.
Expand Down
59 changes: 27 additions & 32 deletions aptos-move/block-executor/src/code_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,39 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache
Self::Version,
)>,
> {
// First, look up the module in the cross-block global module cache. Record the read for
// later validation in case the read module is republished.
if let Some(module) = self.global_module_cache.get_valid(key) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone()),
ViewState::Unsync(state) => {
state.read_set.borrow_mut().capture_module_read(key.clone())
},
}
return Ok(Some((module, Self::Version::default())));
}

// Global cache miss: check module cache in versioned/unsync maps.
match &self.latest_view {
ViewState::Sync(state) => {
// Check the transaction-level cache with already read modules first.
let cache_read = state.captured_reads.borrow().get_module_read(key)?;
match cache_read {
CacheRead::Hit(read) => Ok(read),
CacheRead::Miss => {
// If the module has not been accessed by this transaction, go to the
// module cache and record the read.
let read = state
.versioned_map
.module_cache()
.get_module_or_build_with(key, builder)?;
state
.captured_reads
.borrow_mut()
.capture_per_block_cache_read(key.clone(), read.clone());
Ok(read)
},
if let CacheRead::Hit(read) = state.captured_reads.borrow().get_module_read(key) {
return Ok(read);
}

// Otherwise, it is a miss. Check global cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we check global cache before checking state.versioned_map.module_cache ?

on rolling commit - are we updating GlobalCache itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update global cache at rolling commit - if published keys exist in global cache, we mark them as invalid. So reads to them results in a cache miss and we fallback to MVHashMap where we have placed the write at commit time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check versioned before, but then you end up acquiring a lock for potentially non-republished module (publish is rare). If 32 threads do this for aptos-framework, this is bad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead, we lookup in global first, but check an atomic bool flag there (better than a lock), so we optimize for read case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I would rename PerBlockCache to UnfinalizedBlockCache or something like that - to make it clear it only ever refers to things before rolling commit, and GlobalCache is global and updated within the block itself.

(you can do that in separate PR of course :) )

if let Some(module) = self.global_module_cache.get_valid(key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we reverse the order of checking now? (I was wondering for the previous pr about the order too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, we always check local cache first. If it is not there, we as before check 1) global first, if valid, 2) per-block next. In both cases, clone the module to captured reads (local cache). So next read always reads the same thing. Does this make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was asking more about why we checked global cache in previous pr, is this an orthogonal change or we need to reverse the order now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still first check global cache.

What is added here is check in captured reads - meaning whether this same transaction has already read it, and if it did - do not read it again

state
.captured_reads
.borrow_mut()
.capture_global_cache_read(key.clone(), module.clone());
return Ok(Some((module, Self::Version::default())));
}

// If not global cache, check per-block cache.
let read = state
.versioned_map
.module_cache()
.get_module_or_build_with(key, builder)?;
state
.captured_reads
.borrow_mut()
.capture_per_block_cache_read(key.clone(), read.clone());
Ok(read)
},
ViewState::Unsync(state) => {
if let Some(module) = self.global_module_cache.get_valid(key) {
state.read_set.borrow_mut().capture_module_read(key.clone());
return Ok(Some((module, Self::Version::default())));
}

let read = state
.unsync_map
.module_cache()
Expand Down
24 changes: 22 additions & 2 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ where
Self::publish_module_writes(
txn_idx,
module_write_set,
base_view,
global_module_cache,
versioned_cache,
scheduler,
Expand Down Expand Up @@ -671,6 +672,7 @@ where
Self::publish_module_writes(
txn_idx,
module_write_set,
base_view,
global_module_cache,
versioned_cache,
scheduler,
Expand Down Expand Up @@ -760,6 +762,7 @@ where
fn publish_module_writes(
txn_idx: TxnIndex,
module_write_set: BTreeMap<T::Key, ModuleWrite<T::Value>>,
base_view: &S,
global_module_cache: &GlobalModuleCache<
ModuleId,
CompiledModule,
Expand All @@ -773,11 +776,13 @@ where
// Turn on the flag for module read validation.
scheduler.validate_module_reads();

for (_, write) in module_write_set {
for (key, write) in module_write_set {
Self::add_module_write_to_module_cache(
key,
write,
txn_idx,
runtime_environment,
base_view,
global_module_cache,
versioned_cache.module_cache(),
)?;
Expand Down Expand Up @@ -1216,9 +1221,11 @@ where

/// Converts module write into cached module representation, and adds it to the module cache.
fn add_module_write_to_module_cache(
key: T::Key,
write: ModuleWrite<T::Value>,
txn_idx: TxnIndex,
runtime_environment: &RuntimeEnvironment,
base_view: &S,
global_module_cache: &GlobalModuleCache<
ModuleId,
CompiledModule,
Expand All @@ -1233,8 +1240,17 @@ where
Version = Option<TxnIndex>,
>,
) -> Result<(), PanicError> {
let (id, write_op) = write.unpack();
// Enforce read-before-write because storage (DB) relies on this assumption.
let _ = base_view.get_state_value(&key).map_err(|err| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need some tests for this assumption 🤦‍♂️

Copy link
Contributor Author

@georgemitenkov georgemitenkov Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msmouse Do you have a code pointer how I can test this with a DB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he adds a panic here #15283 so we just need to test such some publish scenarios (with real db)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many tests fail with the panic.. I need to debug today to see how to enforce the assumption better.

none the less I think we need to trigger some module republishing in a smoke test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msmouse does it need to be a smoke test? we can have executor benchmark lib unit test with publishing with 3 lines of code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hard to construct some complex scenario in smoke test because most of the time a block only has one txn (block produces too fast). it's easier to use executor + db alone to reproduce. I thought we already have module publish in smoke tests

Copy link
Contributor

@igor-aptos igor-aptos Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


    #[test]
    fn test_module_publish_txns() {
        AptosVM::set_num_shards_once(1);
        AptosVM::set_concurrency_level_once(4);
        AptosVM::set_processed_transactions_detailed_counters();
        test_generic_benchmark::<AptosVMBlockExecutor>(
            Some(TransactionTypeArg::PublishPackage),
            true,
        );
    }

this doesn't call on them afterwards though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to repeatedly publish the same module and the module must be being used so it resides in cache, is that simple to implement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be, let me see

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the republish and call mixed workload - #15292, and it fails.

you can use it to confirm/test this fix.

PanicError::CodeInvariantError(format!(
"Unexpected storage error for module {}::{} to enforce read-before-write: {:?}",
write.module_address(),
write.module_name(),
err
))
})?;

let (id, write_op) = write.unpack();
let state_value = write_op.as_state_value().ok_or_else(|| {
PanicError::CodeInvariantError("Modules cannot be deleted".to_string())
})?;
Expand Down Expand Up @@ -1268,6 +1284,7 @@ where
fn apply_output_sequential(
txn_idx: TxnIndex,
runtime_environment: &RuntimeEnvironment,
base_view: &S,
global_module_cache: &GlobalModuleCache<
ModuleId,
CompiledModule,
Expand Down Expand Up @@ -1296,9 +1313,11 @@ where
for (key, write) in output.module_write_set().into_iter() {
if runtime_environment.vm_config().use_loader_v2 {
Self::add_module_write_to_module_cache(
key,
write,
txn_idx,
runtime_environment,
base_view,
global_module_cache,
unsync_map.module_cache(),
)?;
Expand Down Expand Up @@ -1580,6 +1599,7 @@ where
Self::apply_output_sequential(
idx as TxnIndex,
runtime_environment,
base_view,
module_cache_manager_guard.module_cache(),
&unsync_map,
&output,
Expand Down
Loading