-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[loader-v2] Fixing global cache reads & read-before-write on publish #15285
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -297,7 +297,7 @@ impl DelayedFieldRead { | |
/// from [SyncCodeCache] it can first check the read-set here. | ||
enum ModuleRead<DC, VC, S> { | ||
/// Read from the cross-block module cache. | ||
GlobalCache, | ||
GlobalCache(Arc<ModuleCode<DC, VC, S>>), | ||
/// Read from per-block cache ([SyncCodeCache]) used by parallel execution. | ||
PerBlockCache(Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>), | ||
} | ||
|
@@ -615,8 +615,8 @@ where | |
} | ||
|
||
/// Records the read to global cache that spans across multiple blocks. | ||
pub(crate) fn capture_global_cache_read(&mut self, key: K) { | ||
self.module_reads.insert(key, ModuleRead::GlobalCache); | ||
pub(crate) fn capture_global_cache_read(&mut self, key: K, read: Arc<ModuleCode<DC, VC, S>>) { | ||
self.module_reads.insert(key, ModuleRead::GlobalCache(read)); | ||
} | ||
|
||
/// Records the read to per-block level cache. | ||
|
@@ -629,22 +629,19 @@ where | |
.insert(key, ModuleRead::PerBlockCache(read)); | ||
} | ||
|
||
/// If the module has been previously read from [SyncCodeCache], returns it. Returns a panic | ||
/// error if the read was cached for the global cross-module cache (we do not capture values | ||
/// for those). | ||
/// If the module has been previously read, returns it. | ||
pub(crate) fn get_module_read( | ||
&self, | ||
key: &K, | ||
) -> Result<CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>>, PanicError> { | ||
Ok(match self.module_reads.get(key) { | ||
) -> CacheRead<Option<(Arc<ModuleCode<DC, VC, S>>, Option<TxnIndex>)>> { | ||
match self.module_reads.get(key) { | ||
Some(ModuleRead::PerBlockCache(read)) => CacheRead::Hit(read.clone()), | ||
Some(ModuleRead::GlobalCache) => { | ||
return Err(PanicError::CodeInvariantError( | ||
"Global module cache reads do not capture values".to_string(), | ||
)); | ||
Some(ModuleRead::GlobalCache(read)) => { | ||
// From global cache, we return a storage version. | ||
CacheRead::Hit(Some((read.clone(), None))) | ||
}, | ||
None => CacheRead::Miss, | ||
}) | ||
} | ||
} | ||
|
||
/// For every module read that was captured, checks if the reads are still the same: | ||
|
@@ -661,7 +658,7 @@ where | |
} | ||
|
||
self.module_reads.iter().all(|(key, read)| match read { | ||
ModuleRead::GlobalCache => global_module_cache.contains_valid(key), | ||
ModuleRead::GlobalCache(_) => global_module_cache.contains_valid(key), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this whole match be equivalent to:
why do we need to update GlobalCache at all while executing a block? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do if we read first from it (to know if entry is overridden or not). An alternative is to check lower level cache first, but this means performance penalty due to locking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code can be somewhat equivalent, but:
causes a prefetch of storage version by default. We would need to special case validation to not do it. An we also end up locking the cache (shard, worst case), instead of checking an atomic bool There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is because we may publish a module that invalidates the global cache that's being read I think |
||
ModuleRead::PerBlockCache(previous) => { | ||
let current_version = per_block_module_cache.get_module_version(key); | ||
let previous_version = previous.as_ref().map(|(_, version)| *version); | ||
|
@@ -1537,20 +1534,6 @@ mod test { | |
); | ||
} | ||
|
||
#[test] | ||
fn test_global_cache_module_reads_are_not_recorded() { | ||
let mut captured_reads = CapturedReads::< | ||
TestTransactionType, | ||
u32, | ||
MockDeserializedCode, | ||
MockVerifiedCode, | ||
MockExtension, | ||
>::new(); | ||
|
||
captured_reads.capture_global_cache_read(0); | ||
assert!(captured_reads.get_module_read(&0).is_err()) | ||
} | ||
|
||
#[test] | ||
fn test_global_cache_module_reads() { | ||
let mut captured_reads = CapturedReads::< | ||
|
@@ -1563,11 +1546,13 @@ mod test { | |
let mut global_module_cache = GlobalModuleCache::empty(); | ||
let per_block_module_cache = SyncModuleCache::empty(); | ||
|
||
global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8))); | ||
captured_reads.capture_global_cache_read(0); | ||
let module_0 = mock_verified_code(0, MockExtension::new(8)); | ||
global_module_cache.insert(0, module_0.clone()); | ||
captured_reads.capture_global_cache_read(0, module_0); | ||
|
||
global_module_cache.insert(1, mock_verified_code(1, MockExtension::new(8))); | ||
captured_reads.capture_global_cache_read(1); | ||
let module_1 = mock_verified_code(1, MockExtension::new(8)); | ||
global_module_cache.insert(1, module_1.clone()); | ||
captured_reads.capture_global_cache_read(1, module_1); | ||
|
||
assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache)); | ||
|
||
|
@@ -1613,18 +1598,18 @@ mod test { | |
captured_reads.capture_per_block_cache_read(0, Some((a, Some(2)))); | ||
assert!(matches!( | ||
captured_reads.get_module_read(&0), | ||
Ok(CacheRead::Hit(Some(_))) | ||
CacheRead::Hit(Some(_)) | ||
)); | ||
|
||
captured_reads.capture_per_block_cache_read(1, None); | ||
assert!(matches!( | ||
captured_reads.get_module_read(&1), | ||
Ok(CacheRead::Hit(None)) | ||
CacheRead::Hit(None) | ||
)); | ||
|
||
assert!(matches!( | ||
captured_reads.get_module_read(&2), | ||
Ok(CacheRead::Miss) | ||
CacheRead::Miss | ||
)); | ||
} | ||
|
||
|
@@ -1701,8 +1686,9 @@ mod test { | |
let per_block_module_cache = SyncModuleCache::empty(); | ||
|
||
// Module exists in global cache. | ||
global_module_cache.insert(0, mock_verified_code(0, MockExtension::new(8))); | ||
captured_reads.capture_global_cache_read(0); | ||
let m = mock_verified_code(0, MockExtension::new(8)); | ||
global_module_cache.insert(0, m.clone()); | ||
captured_reads.capture_global_cache_read(0, m); | ||
assert!(captured_reads.validate_module_reads(&global_module_cache, &per_block_module_cache)); | ||
|
||
// Assume we republish this module: validation must fail. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,44 +136,39 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache | |
Self::Version, | ||
)>, | ||
> { | ||
// First, look up the module in the cross-block global module cache. Record the read for | ||
// later validation in case the read module is republished. | ||
if let Some(module) = self.global_module_cache.get_valid(key) { | ||
match &self.latest_view { | ||
ViewState::Sync(state) => state | ||
.captured_reads | ||
.borrow_mut() | ||
.capture_global_cache_read(key.clone()), | ||
ViewState::Unsync(state) => { | ||
state.read_set.borrow_mut().capture_module_read(key.clone()) | ||
}, | ||
} | ||
return Ok(Some((module, Self::Version::default()))); | ||
} | ||
|
||
// Global cache miss: check module cache in versioned/unsync maps. | ||
match &self.latest_view { | ||
ViewState::Sync(state) => { | ||
// Check the transaction-level cache with already read modules first. | ||
let cache_read = state.captured_reads.borrow().get_module_read(key)?; | ||
match cache_read { | ||
CacheRead::Hit(read) => Ok(read), | ||
CacheRead::Miss => { | ||
// If the module has not been accessed by this transaction, go to the | ||
// module cache and record the read. | ||
let read = state | ||
.versioned_map | ||
.module_cache() | ||
.get_module_or_build_with(key, builder)?; | ||
state | ||
.captured_reads | ||
.borrow_mut() | ||
.capture_per_block_cache_read(key.clone(), read.clone()); | ||
Ok(read) | ||
}, | ||
if let CacheRead::Hit(read) = state.captured_reads.borrow().get_module_read(key) { | ||
return Ok(read); | ||
} | ||
|
||
// Otherwise, it is a miss. Check global cache. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we check global cache before checking state.versioned_map.module_cache ? on rolling commit - are we updating GlobalCache itself? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We update global cache at rolling commit - if published keys exist in global cache, we mark them as invalid. So reads to them results in a cache miss and we fallback to MVHashMap where we have placed the write at commit time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can check versioned before, but then you end up acquiring a lock for potentially non-republished module (publish is rare). If 32 threads do this for aptos-framework, this is bad. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So instead, we lookup in global first, but check an atomic bool flag there (better than a lock), so we optimize for read case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, then I would rename PerBlockCache to UnfinalizedBlockCache or something like that - to make it clear it only ever refers to things before rolling commit, and GlobalCache is global and updated within the block itself. (you can do that in separate PR of course :) ) |
||
if let Some(module) = self.global_module_cache.get_valid(key) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we reverse the order of checking now? (I was wondering for the previous pr about the order too) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now, we always check local cache first. If it is not there, we as before check 1) global first, if valid, 2) per-block next. In both cases, clone the module to captured reads (local cache). So next read always reads the same thing. Does this make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was asking more about why we checked global cache in previous pr, is this an orthogonal change or we need to reverse the order now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we still first check global cache. What is added here is check in captured reads - meaning whether this same transaction has already read it, and if it did - do not read it again |
||
state | ||
.captured_reads | ||
.borrow_mut() | ||
.capture_global_cache_read(key.clone(), module.clone()); | ||
return Ok(Some((module, Self::Version::default()))); | ||
} | ||
|
||
// If not global cache, check per-block cache. | ||
let read = state | ||
.versioned_map | ||
.module_cache() | ||
.get_module_or_build_with(key, builder)?; | ||
state | ||
.captured_reads | ||
.borrow_mut() | ||
.capture_per_block_cache_read(key.clone(), read.clone()); | ||
Ok(read) | ||
}, | ||
ViewState::Unsync(state) => { | ||
if let Some(module) = self.global_module_cache.get_valid(key) { | ||
state.read_set.borrow_mut().capture_module_read(key.clone()); | ||
return Ok(Some((module, Self::Version::default()))); | ||
} | ||
|
||
let read = state | ||
.unsync_map | ||
.module_cache() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -629,6 +629,7 @@ where | |
Self::publish_module_writes( | ||
txn_idx, | ||
module_write_set, | ||
base_view, | ||
global_module_cache, | ||
versioned_cache, | ||
scheduler, | ||
|
@@ -671,6 +672,7 @@ where | |
Self::publish_module_writes( | ||
txn_idx, | ||
module_write_set, | ||
base_view, | ||
global_module_cache, | ||
versioned_cache, | ||
scheduler, | ||
|
@@ -760,6 +762,7 @@ where | |
fn publish_module_writes( | ||
txn_idx: TxnIndex, | ||
module_write_set: BTreeMap<T::Key, ModuleWrite<T::Value>>, | ||
base_view: &S, | ||
global_module_cache: &GlobalModuleCache< | ||
ModuleId, | ||
CompiledModule, | ||
|
@@ -773,11 +776,13 @@ where | |
// Turn on the flag for module read validation. | ||
scheduler.validate_module_reads(); | ||
|
||
for (_, write) in module_write_set { | ||
for (key, write) in module_write_set { | ||
Self::add_module_write_to_module_cache( | ||
key, | ||
write, | ||
txn_idx, | ||
runtime_environment, | ||
base_view, | ||
global_module_cache, | ||
versioned_cache.module_cache(), | ||
)?; | ||
|
@@ -1216,9 +1221,11 @@ where | |
|
||
/// Converts module write into cached module representation, and adds it to the module cache. | ||
fn add_module_write_to_module_cache( | ||
key: T::Key, | ||
write: ModuleWrite<T::Value>, | ||
txn_idx: TxnIndex, | ||
runtime_environment: &RuntimeEnvironment, | ||
base_view: &S, | ||
global_module_cache: &GlobalModuleCache< | ||
ModuleId, | ||
CompiledModule, | ||
|
@@ -1233,8 +1240,17 @@ where | |
Version = Option<TxnIndex>, | ||
>, | ||
) -> Result<(), PanicError> { | ||
let (id, write_op) = write.unpack(); | ||
// Enforce read-before-write because storage (DB) relies on this assumption. | ||
let _ = base_view.get_state_value(&key).map_err(|err| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need some tests for this assumption 🤦♂️ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @msmouse Do you have a code pointer how I can test this with a DB? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think he adds a panic here #15283 so we just need to test such some publish scenarios (with real db) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. many tests fail with the panic.. I need to debug today to see how to enforce the assumption better. none the less I think we need to trigger some module republishing in a smoke test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @msmouse does it need to be a smoke test? we can have executor benchmark lib unit test with publishing with 3 lines of code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's hard to construct some complex scenario in smoke test because most of the time a block only has one txn (block produces too fast). it's easier to use executor + db alone to reproduce. I thought we already have module publish in smoke tests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
this doesn't call on them afterwards though There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need to repeatedly publish the same module and the module must be being used so it resides in cache, is that simple to implement? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be, let me see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here is the republish and call mixed workload - #15292, and it fails. you can use it to confirm/test this fix. |
||
PanicError::CodeInvariantError(format!( | ||
"Unexpected storage error for module {}::{} to enforce read-before-write: {:?}", | ||
write.module_address(), | ||
write.module_name(), | ||
err | ||
)) | ||
})?; | ||
|
||
let (id, write_op) = write.unpack(); | ||
let state_value = write_op.as_state_value().ok_or_else(|| { | ||
PanicError::CodeInvariantError("Modules cannot be deleted".to_string()) | ||
})?; | ||
|
@@ -1268,6 +1284,7 @@ where | |
fn apply_output_sequential( | ||
txn_idx: TxnIndex, | ||
runtime_environment: &RuntimeEnvironment, | ||
base_view: &S, | ||
global_module_cache: &GlobalModuleCache< | ||
ModuleId, | ||
CompiledModule, | ||
|
@@ -1296,9 +1313,11 @@ where | |
for (key, write) in output.module_write_set().into_iter() { | ||
if runtime_environment.vm_config().use_loader_v2 { | ||
Self::add_module_write_to_module_cache( | ||
key, | ||
write, | ||
txn_idx, | ||
runtime_environment, | ||
base_view, | ||
global_module_cache, | ||
unsync_map.module_cache(), | ||
)?; | ||
|
@@ -1580,6 +1599,7 @@ where | |
Self::apply_output_sequential( | ||
idx as TxnIndex, | ||
runtime_environment, | ||
base_view, | ||
module_cache_manager_guard.module_cache(), | ||
&unsync_map, | ||
&output, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain why do we distinguish reads here based on where we got the data from? also what is
Option<TxnIndex>
in the PerBlockCache ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option - module does not exist (in StateView even).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different reads - different validations. We need to check that global reads are still valid, and per-block reads have the same version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stupid formatting, didn't show I was referring to TxnIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - None is a storage version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it better than
Result<TxnIndex, StorageVersion>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why distinction between storage version and global cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different validation paths: for global cache read we need to check if the read is still valid in cache. For per-block we go to MVHashMap. Now, the question is about storage read: we issue it only when there is a cache miss in per-block cache, so it gets validated there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically "storage version" can be later drained into global cache, but otherwise exists only in per-block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so from validation perspective - there is no distinction
distinction is ONLY there to make updating global cache (i.e. draining to it) be faster/cheaper by skipping things that are already there.
is that correct?