Skip to content

Commit

Permalink
Verify accumulator behavior in sui transactional tests (MystenLabs#11205
Browse files Browse the repository at this point in the history
)

Unfortunately, the transactional tests don't use AuthorityStore, so we
have to duplicate some logic here, but I think this is well worth it for
the increased coverage.
  • Loading branch information
mystenmark authored Apr 26, 2023
1 parent 630f446 commit fe6ae84
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 134 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

327 changes: 195 additions & 132 deletions crates/sui-core/src/state_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use itertools::Itertools;
use mysten_metrics::monitored_scope;
use serde::Serialize;
use sui_protocol_config::ProtocolConfig;
use sui_types::base_types::{ObjectID, SequenceNumber};
use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber};
use sui_types::committee::EpochId;
use sui_types::digests::{ObjectDigest, TransactionDigest};
use sui_types::storage::ObjectKey;
use sui_types::in_memory_storage::InMemoryStorage;
use sui_types::object::Object;
use sui_types::storage::{ObjectKey, ObjectStore};
use tracing::debug;
use typed_store::Map;

Expand All @@ -30,18 +32,69 @@ pub struct StateAccumulator {
authority_store: Arc<AuthorityStore>,
}

pub trait AccumulatorReadStore {
fn multi_get_object_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>>;

fn get_object_ref_prior_to_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> SuiResult<Option<ObjectRef>>;
}

impl AccumulatorReadStore for AuthorityStore {
fn multi_get_object_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
self.multi_get_object_by_key(object_keys)
}

fn get_object_ref_prior_to_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> SuiResult<Option<ObjectRef>> {
self.get_object_ref_prior_to_key(object_id, version)
}
}

impl AccumulatorReadStore for InMemoryStorage {
fn multi_get_object_by_key(&self, object_keys: &[ObjectKey]) -> SuiResult<Vec<Option<Object>>> {
let mut objects = Vec::new();
for key in object_keys {
objects.push(self.get_object_by_key(&key.0, key.1)?);
}
Ok(objects)
}

fn get_object_ref_prior_to_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
) -> SuiResult<Option<ObjectRef>> {
Ok(if let Some(wrapped_version) = self.get_wrapped(object_id) {
assert!(wrapped_version < version);
Some((
*object_id,
wrapped_version,
ObjectDigest::OBJECT_DIGEST_WRAPPED,
))
} else {
None
})
}
}

/// Serializable representation of the ObjectRef of an
/// object that has been wrapped
/// TODO: This can be replaced with ObjectKey.
#[derive(Serialize)]
struct WrappedObject {
#[derive(Serialize, Debug)]
pub struct WrappedObject {
id: ObjectID,
wrapped_at: SequenceNumber,
digest: ObjectDigest,
}

impl WrappedObject {
fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
Self {
id,
wrapped_at,
Expand All @@ -50,6 +103,142 @@ impl WrappedObject {
}
}

pub fn accumulate_effects<T, S>(
store: S,
effects: Vec<TransactionEffects>,
protocol_config: &ProtocolConfig,
) -> Accumulator
where
S: std::ops::Deref<Target = T>,
T: AccumulatorReadStore,
{
let mut acc = Accumulator::default();

// process insertions to the set
acc.insert_all(
effects
.iter()
.flat_map(|fx| {
fx.created()
.iter()
.map(|(oref, _)| oref.2)
.chain(fx.unwrapped().iter().map(|(oref, _)| oref.2))
.chain(fx.mutated().iter().map(|(oref, _)| oref.2))
})
.collect::<Vec<ObjectDigest>>(),
);

// insert wrapped tombstones. We use a custom struct in order to contain the tombstone
// against the object id and sequence number, as the tombstone by itself is not unique.
acc.insert_all(
effects
.iter()
.flat_map(|fx| {
fx.wrapped()
.iter()
.map(|oref| {
bcs::to_bytes(&WrappedObject::new(oref.0, oref.1))
.unwrap()
.to_vec()
})
.collect::<Vec<Vec<u8>>>()
})
.collect::<Vec<Vec<u8>>>(),
);

let all_unwrapped = effects
.iter()
.flat_map(|fx| {
fx.unwrapped()
.iter()
.map(|(oref, _owner)| (*fx.transaction_digest(), oref.0, oref.1))
})
.chain(effects.iter().flat_map(|fx| {
fx.unwrapped_then_deleted()
.iter()
.map(|oref| (*fx.transaction_digest(), oref.0, oref.1))
}))
.collect::<Vec<(TransactionDigest, ObjectID, SequenceNumber)>>();

let unwrapped_ids: HashMap<TransactionDigest, HashSet<ObjectID>> = all_unwrapped
.iter()
.map(|(digest, id, _)| (*digest, *id))
.into_group_map()
.iter()
.map(|(digest, ids)| (*digest, HashSet::from_iter(ids.iter().cloned())))
.collect();

// Collect keys from modified_at_versions to remove from the accumulator.
// Filter all unwrapped objects (from unwrapped or unwrapped_then_deleted effects)
// as these were inserted into the accumulator as a WrappedObject. Will handle these
// separately.
let modified_at_version_keys: Vec<ObjectKey> = effects
.iter()
.flat_map(|fx| {
fx.modified_at_versions()
.iter()
.map(|(id, seq_num)| (*fx.transaction_digest(), *id, *seq_num))
})
.filter_map(|(tx_digest, id, seq_num)| {
// unwrapped tx
if let Some(ids) = unwrapped_ids.get(&tx_digest) {
// object unwrapped in this tx. We handle it later
if ids.contains(&id) {
return None;
}
}
Some(ObjectKey(id, seq_num))
})
.collect();

let modified_at_digests: Vec<_> = store
.multi_get_object_by_key(&modified_at_version_keys.clone())
.expect("Failed to get modified_at_versions object from object table")
.into_iter()
.zip(modified_at_version_keys)
.map(|(obj, key)| {
obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
.compute_object_reference()
.2
})
.collect();
acc.remove_all(modified_at_digests);

// Process unwrapped and unwrapped_then_deleted effects, which need to be
// removed as WrappedObject using the last sequence number it was tombstoned
// against. Since this happened in a past transaction, and the child object may
// have been modified since (and hence its sequence number incremented), we
// seek the version prior to the unwrapped version from the objects table directly.
// If the tombstone is not found, then assume this is a newly created wrapped object hence
// we don't expect to find it in the table.
let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
.iter()
.filter_map(|(_tx_digest, id, seq_num)| {
let objref = store
.get_object_ref_prior_to_key(id, *seq_num)
.expect("read cannot fail");

objref.map(|(id, version, digest)| {
assert!(
!protocol_config.loaded_child_objects_fixed() || digest.is_wrapped(),
"{:?}",
id
);
WrappedObject::new(id, version)
})
})
.collect();

acc.remove_all(
wrapped_objects_to_remove
.iter()
.map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
.collect::<Vec<Vec<u8>>>(),
);

acc
}

impl StateAccumulator {
pub fn new(authority_store: Arc<AuthorityStore>) -> Self {
Self { authority_store }
Expand Down Expand Up @@ -85,133 +274,7 @@ impl StateAccumulator {
effects: Vec<TransactionEffects>,
protocol_config: &ProtocolConfig,
) -> Accumulator {
let mut acc = Accumulator::default();

// process insertions to the set
acc.insert_all(
effects
.iter()
.flat_map(|fx| {
fx.created()
.iter()
.map(|(oref, _)| oref.2)
.chain(fx.unwrapped().iter().map(|(oref, _)| oref.2))
.chain(fx.mutated().iter().map(|(oref, _)| oref.2))
})
.collect::<Vec<ObjectDigest>>(),
);

// insert wrapped tombstones. We use a custom struct in order to contain the tombstone
// against the object id and sequence number, as the tombstone by itself is not unique.
acc.insert_all(
effects
.iter()
.flat_map(|fx| {
fx.wrapped()
.iter()
.map(|oref| {
bcs::to_bytes(&WrappedObject::new(oref.0, oref.1))
.unwrap()
.to_vec()
})
.collect::<Vec<Vec<u8>>>()
})
.collect::<Vec<Vec<u8>>>(),
);

let all_unwrapped = effects
.iter()
.flat_map(|fx| {
fx.unwrapped()
.iter()
.map(|(oref, _owner)| (*fx.transaction_digest(), oref.0, oref.1))
})
.chain(effects.iter().flat_map(|fx| {
fx.unwrapped_then_deleted()
.iter()
.map(|oref| (*fx.transaction_digest(), oref.0, oref.1))
}))
.collect::<Vec<(TransactionDigest, ObjectID, SequenceNumber)>>();

let unwrapped_ids: HashMap<TransactionDigest, HashSet<ObjectID>> = all_unwrapped
.iter()
.map(|(digest, id, _)| (*digest, *id))
.into_group_map()
.iter()
.map(|(digest, ids)| (*digest, HashSet::from_iter(ids.iter().cloned())))
.collect();

// Collect keys from modified_at_versions to remove from the accumulator.
// Filter all unwrapped objects (from unwrapped or unwrapped_then_deleted effects)
// as these were inserted into the accumulator as a WrappedObject. Will handle these
// separately.
let modified_at_version_keys: Vec<ObjectKey> = effects
.iter()
.flat_map(|fx| {
fx.modified_at_versions()
.iter()
.map(|(id, seq_num)| (*fx.transaction_digest(), *id, *seq_num))
})
.filter_map(|(tx_digest, id, seq_num)| {
// unwrapped tx
if let Some(ids) = unwrapped_ids.get(&tx_digest) {
// object unwrapped in this tx. We handle it later
if ids.contains(&id) {
return None;
}
}
Some(ObjectKey(id, seq_num))
})
.collect();

let modified_at_digests: Vec<_> = self
.authority_store
.multi_get_object_by_key(&modified_at_version_keys.clone())
.expect("Failed to get modified_at_versions object from object table")
.into_iter()
.zip(modified_at_version_keys)
.map(|(obj, key)| {
obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
.compute_object_reference()
.2
})
.collect();
acc.remove_all(modified_at_digests);

// Process unwrapped and unwrapped_then_deleted effects, which need to be
// removed as WrappedObject using the last sequence number it was tombstoned
// against. Since this happened in a past transaction, and the child object may
// have been modified since (and hence its sequence number incremented), we
// seek the version prior to the unwrapped version from the objects table directly.
// If the tombstone is not found, then assume this is a newly created wrapped object hence
// we don't expect to find it in the table.
let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
.iter()
.filter_map(|(_tx_digest, id, seq_num)| {
let objref = self
.authority_store
.get_object_ref_prior_to_key(id, *seq_num)
.expect("read cannot fail");

objref.map(|(id, version, digest)| {
assert!(
!protocol_config.loaded_child_objects_fixed() || digest.is_wrapped(),
"{:?}",
id
);
WrappedObject::new(id, version)
})
})
.collect();

acc.remove_all(
wrapped_objects_to_remove
.iter()
.map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
.collect::<Vec<Vec<u8>>>(),
);

acc
accumulate_effects(&*self.authority_store, effects, protocol_config)
}

/// Unions all checkpoint accumulators at the end of the epoch to generate the
Expand Down
1 change: 1 addition & 0 deletions crates/sui-transactional-test-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ once_cell = "1.16"
rand = "0.8.5"
prometheus = "0.13.3"

fastcrypto.workspace = true
move-binary-format.workspace = true
move-bytecode-utils.workspace = true
move-command-line-common.workspace = true
Expand Down
Loading

0 comments on commit fe6ae84

Please sign in to comment.