Skip to content

Commit

Permalink
More clean ups related to shared object handling
Browse files Browse the repository at this point in the history
  • Loading branch information
George Danezis committed Mar 11, 2022
1 parent 8eaad4a commit 8658396
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 140 deletions.
137 changes: 77 additions & 60 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use move_core_types::{
};
use move_vm_runtime::native_functions::NativeFunctionTable;
use std::{
collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
pin::Pin,
sync::Arc,
};
Expand Down Expand Up @@ -119,7 +119,7 @@ impl AuthorityState {
transaction: &Transaction,
object_kind: InputObjectKind,
object: &Object,
mutable_object_addresses: &HashSet<SuiAddress>,
owned_object_authenticators: &HashSet<SuiAddress>,
) -> SuiResult {
match object_kind {
InputObjectKind::MovePackage(package_id) => {
Expand Down Expand Up @@ -184,7 +184,7 @@ impl AuthorityState {
Owner::ObjectOwner(owner) => {
// Check that the object owner is another mutable object in the input.
fp_ensure!(
mutable_object_addresses.contains(&owner),
owned_object_authenticators.contains(&owner),
SuiError::IncorrectSigner
);
}
Expand All @@ -193,7 +193,10 @@ impl AuthorityState {
}
};
}
InputObjectKind::SharedMoveObject(..) => (),
InputObjectKind::SharedMoveObject(..) => {
// When someone locks an object as shared it must be shared already.
fp_ensure!(object.is_shared(), SuiError::NotSharedObjectError);
}
};
Ok(())
}
Expand All @@ -205,18 +208,22 @@ impl AuthorityState {
transaction: &Transaction,
) -> Result<Vec<(InputObjectKind, Object)>, SuiError> {
let input_objects = transaction.input_objects();
let mut all_objects = Vec::with_capacity(input_objects.len());

// These IDs act as authenticators that can own other objects.
let objects = self.fetch_objects(&input_objects).await?;
let mutable_object_addresses: HashSet<_> = objects
let owned_object_authenticators: HashSet<_> = objects
.iter()
.flat_map(|opt_obj| match opt_obj {
Some(obj) if !obj.is_read_only() => Some(obj.id().into()),
_ => None,
})
.collect();

// Gather all objects and errors.
let mut all_objects = Vec::with_capacity(input_objects.len());
let mut errors = Vec::new();
for (object_kind, object) in input_objects.into_iter().zip(objects) {
// All objects must exist in the DB.
let object = match object {
Some(object) => object,
None => {
Expand All @@ -225,8 +232,14 @@ impl AuthorityState {
}
};

match self.check_one_lock(transaction, object_kind, &object, &mutable_object_addresses)
{
// Check if the object contents match the type of lock we need for
// this object.
match self.check_one_lock(
transaction,
object_kind,
&object,
&owned_object_authenticators,
) {
Ok(()) => all_objects.push((object_kind, object)),
Err(e) => {
errors.push(e);
Expand Down Expand Up @@ -274,7 +287,7 @@ impl AuthorityState {
transaction.check_signature()?;
let transaction_digest = transaction.digest();

let mutable_objects: Vec<_> = self
let owned_objects: Vec<_> = self
.check_locks(&transaction)
.instrument(tracing::trace_span!("tx_check_locks"))
.await?
Expand All @@ -293,7 +306,7 @@ impl AuthorityState {
.collect();

debug!(
num_mutable_objects = mutable_objects.len(),
num_mutable_objects = owned_objects.len(),
"Checked locks and found mutable objects"
);

Expand All @@ -303,7 +316,7 @@ impl AuthorityState {
// The call to self.set_transaction_lock checks the lock is not conflicting,
// and returns ConflictingTransaction error in case there is a lock on a different
// existing transaction.
self.set_transaction_lock(&mutable_objects, signed_transaction)
self.set_transaction_lock(&owned_objects, signed_transaction)
.instrument(tracing::trace_span!("db_set_transaction_lock"))
.await?;

Expand Down Expand Up @@ -336,59 +349,61 @@ impl AuthorityState {
&self,
transaction_digest: TransactionDigest,
transaction: &Transaction,
) -> Result<Vec<Object>, SuiError> {
inputs: &[(InputObjectKind, Object)],
) -> Result<(), SuiError> {
// If the transaction contains shared objects, we need to ensure they have been scheduled
// for processing by the consensus protocol.
if transaction.contains_shared_object() {
debug!("Validating shared object sequence numbers from consensus...");
let mut lock_errors = Vec::new();

let shared_ids = transaction.shared_input_objects();
let shared_locks = self._database.sequenced(transaction_digest, shared_ids)?;
let shared_objects = self.get_objects(shared_ids).await?;

let mut inputs = Vec::with_capacity(shared_ids.len());

for (lock, object) in shared_locks.into_iter().zip(shared_objects.into_iter()) {
// Check whether the shared objects have already been assigned a sequence number by
// the consensus. Bail if the transaction contains even one shared object that either:
// (i) was not assigned a sequence number, or
// (ii) has a different sequence number than the current one.
//
// Note that if the shared object is not in storage (it has been destroyed), we keep
// processing the transaction to unlock all single-writer objects. The execution engine
// will simply execute no-op.

match (lock, object) {
(Some(seq), Some(object)) => {
if object.version() != seq {
warn!(object_version =? object.version(),
locked_version =? seq,
"Unexpected version number in locked shared object");
// TODO: Send a more informative error message here, stating
// that we are waiting for the execution.
lock_errors.push(SuiError::InvalidSequenceNumber);
continue;
}

inputs.push(object);

// Collect the version we have for each shared object
let shared_ids: HashSet<_> = inputs
.iter()
.filter_map(|(kind, obj)| match kind {
InputObjectKind::SharedMoveObject(..) if obj.owner.is_shared_mutable() => {
Some((obj.id(), obj.version()))
}
_ => {
lock_errors.push(SuiError::InvalidSequenceNumber);
_ => None,
})
.collect();
// Internal consistency check
debug_assert!(
!shared_ids.is_empty(),
"we just checked that there are share objects yet none found?"
);

// Read the
let shared_locks: HashMap<_, _> = self
._database
.all_shared_locks(transaction_digest)?
.into_iter()
.collect();

let lock_errors: Vec<_> = shared_ids
.iter()
.filter_map(|(object_id, version)| {
if !shared_locks.contains_key(object_id) {
Some(SuiError::SharedObjectLockNotSetObject)
} else if shared_locks[object_id] != *version {
Some(SuiError::UnexpectedSequenceNumber {
object_id: *object_id,
expected_sequence: shared_locks[object_id],
})
} else {
None
}
}
}
})
.collect();

fp_ensure!(
lock_errors.is_empty(),
SuiError::LockErrors {
errors: lock_errors
}
);

return Ok(inputs);
}

Ok(Vec::new())
Ok(())
}

async fn process_certificate(
Expand All @@ -399,23 +414,25 @@ impl AuthorityState {
let transaction = certificate.transaction.clone();
let transaction_digest = transaction.digest();

let mut inputs: Vec<_> = self
.check_locks(&transaction)
.await?
.into_iter()
.map(|(_, object)| object)
.collect();
let objects_by_kind: Vec<_> = self.check_locks(&transaction).await?;

let shared_objects = self
.check_shared_locks(transaction_digest, &transaction)
// At this point we need to check if any shared objects need locks,
// and whether they have them.
let _shared_objects = self
.check_shared_locks(transaction_digest, &transaction, &objects_by_kind)
.await?;
inputs.extend(shared_objects);
// inputs.extend(shared_objects);

debug!(
num_inputs = inputs.len(),
num_inputs = objects_by_kind.len(),
"Read inputs for transaction from DB"
);

let inputs: Vec<_> = objects_by_kind
.into_iter()
.map(|(_, object)| object)
.collect();

let mut transaction_dependencies: BTreeSet<_> = inputs
.iter()
.map(|object| object.previous_transaction)
Expand Down
14 changes: 14 additions & 0 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,20 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
self.sequenced.multi_get(&keys[..]).map_err(SuiError::from)
}

/// Read a lock for a specific (transaction, shared object) pair.
pub fn all_shared_locks(
&self,
transaction_digest: TransactionDigest,
) -> Result<Vec<(ObjectID, SequenceNumber)>, SuiError> {
Ok(self
.sequenced
.iter()
.skip_to(&(transaction_digest, ObjectID::ZERO))?
.take_while(|((tx, _objid), _ver)| *tx == transaction_digest)
.map(|((_tx, objid), ver)| (objid, ver))
.collect())
}

// Methods to mutate the store

/// Insert an object
Expand Down
Loading

0 comments on commit 8658396

Please sign in to comment.