Skip to content

Commit

Permalink
Fix shared object retries (MystenLabs#4579)
Browse files Browse the repository at this point in the history
* Reproduce shared object retry failure

* Fix shared object transaction retry

Previously, transaction input checker loads the latest version of shared
objects. The fix is to instead load the sequenced versions, but only during
certificate processing.

Co-authored-by: Mark Logan <mark@marklgn.com>
  • Loading branch information
mystenmark and mlogan authored Sep 12, 2022
1 parent 0b88006 commit e64720f
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 39 deletions.
8 changes: 5 additions & 3 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,17 +749,19 @@ impl AuthorityState {
) -> SuiResult<(InnerTemporaryStore, SignedTransactionEffects)> {
let _metrics_guard = start_timer(self.metrics.prepare_certificate_latency.clone());
let (gas_status, input_objects) =
transaction_input_checker::check_transaction_input(&self.database, certificate).await?;
transaction_input_checker::check_certificate_input(&self.database, certificate).await?;

// At this point we need to check if any shared objects need locks,
// and whether they have them.
let shared_object_refs = input_objects.filter_shared_objects();
if !shared_object_refs.is_empty() && !certificate.signed_data.data.kind.is_system_tx() {
if !shared_object_refs.is_empty() && !certificate.signed_data.data.kind.is_change_epoch_tx()
{
// If the transaction contains shared objects, we need to ensure they have been scheduled
// for processing by the consensus protocol.
// There is no need to go through consensus for system transactions that can
// only be executed at a time when consensus is turned off.
// TODO: Add some assert here to make sure consensus is indeed off with is_system_tx.
// TODO: Add some assert here to make sure consensus is indeed off with
// is_change_epoch_tx.
self.check_shared_locks(&transaction_digest, &shared_object_refs)
.await?;
}
Expand Down
25 changes: 25 additions & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,31 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
Ok(result)
}

/// Get many objects by their (id, version number) key.
pub fn get_sequenced_input_objects(
&self,
digest: &TransactionDigest,
objects: &[InputObjectKind],
) -> Result<Vec<Option<Object>>, SuiError> {
let shared_locks: HashMap<_, _> = self.all_shared_locks(digest)?.into_iter().collect();

let mut result = Vec::new();
for kind in objects {
let obj = match kind {
InputObjectKind::MovePackage(id) => self.get_object(id)?,
InputObjectKind::SharedMoveObject(id) => match shared_locks.get(id) {
Some(version) => self.get_object_by_key(id, *version)?,
None => None,
},
InputObjectKind::ImmOrOwnedMoveObject(objref) => {
self.get_object_by_key(&objref.0, objref.1)?
}
};
result.push(obj);
}
Ok(result)
}

/// Read a transaction envelope via lock or returns Err(TransactionLockDoesNotExist) if the lock does not exist.
pub async fn get_transaction_envelope(
&self,
Expand Down
66 changes: 47 additions & 19 deletions crates/sui-core/src/transaction_input_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,20 @@ use sui_types::{
fp_ensure,
gas::{self, SuiGasStatus},
messages::{
InputObjectKind, InputObjects, SingleTransactionKind, TransactionData, TransactionEnvelope,
CertifiedTransaction, InputObjectKind, InputObjects, SingleTransactionKind,
TransactionData, TransactionEnvelope,
},
object::{Object, Owner},
};
use tracing::instrument;

#[instrument(level = "trace", skip_all)]
pub async fn check_transaction_input<S, T>(
async fn get_gas_status<S, T>(
store: &SuiDataStore<S>,
transaction: &TransactionEnvelope<T>,
) -> Result<(SuiGasStatus<'static>, InputObjects), SuiError>
) -> SuiResult<SuiGasStatus<'static>>
where
S: Eq + Debug + Serialize + for<'de> Deserialize<'de>,
{
transaction.signed_data.data.kind.validity_check()?;

let mut gas_status = check_gas(
store,
transaction.gas_payment_object_ref().0,
Expand All @@ -37,14 +35,51 @@ where
)
.await?;

let input_objects = check_objects(store, &transaction.signed_data.data).await?;

if transaction.contains_shared_object() {
// It's important that we do this here to make sure there is enough
// gas to cover shared objects, before we lock all objects.
gas_status.charge_consensus()?;
}

Ok(gas_status)
}

#[instrument(level = "trace", skip_all)]
pub async fn check_transaction_input<S, T>(
store: &SuiDataStore<S>,
transaction: &TransactionEnvelope<T>,
) -> SuiResult<(SuiGasStatus<'static>, InputObjects)>
where
S: Eq + Debug + Serialize + for<'de> Deserialize<'de>,
{
transaction.signed_data.data.kind.validity_check()?;
let gas_status = get_gas_status(store, transaction).await?;
let input_objects = transaction.signed_data.data.input_objects()?;
let objects = store.get_input_objects(&input_objects)?;
let input_objects =
check_objects(&transaction.signed_data.data, input_objects, objects).await?;
Ok((gas_status, input_objects))
}

pub async fn check_certificate_input<S>(
store: &SuiDataStore<S>,
cert: &CertifiedTransaction,
) -> SuiResult<(SuiGasStatus<'static>, InputObjects)>
where
S: Eq + Debug + Serialize + for<'de> Deserialize<'de>,
{
let gas_status = get_gas_status(store, cert).await?;
let input_objects = cert.signed_data.data.input_objects()?;

let tx_data = &cert.signed_data.data;
let objects = if tx_data.kind.is_change_epoch_tx() {
// When changing the epoch, we update a the system object, which is shared, without going
// through sequencing, so we must bypass the sequence checks here.
store.get_input_objects(&input_objects)?
} else {
store.get_sequenced_input_objects(cert.digest(), &input_objects)?
};
let input_objects = check_objects(&cert.signed_data.data, input_objects, objects).await?;
Ok((gas_status, input_objects))
}

Expand Down Expand Up @@ -98,18 +133,11 @@ where
/// Check all the objects used in the transaction against the database, and ensure
/// that they are all the correct version and number.
#[instrument(level = "trace", skip_all)]
async fn check_objects<S>(
store: &SuiDataStore<S>,
async fn check_objects(
transaction: &TransactionData,
) -> Result<InputObjects, SuiError>
where
S: Eq + Debug + Serialize + for<'de> Deserialize<'de>,
{
let input_objects = transaction.input_objects()?;

// These IDs act as authenticators that can own other objects.
let objects = store.get_input_objects(&input_objects)?;

input_objects: Vec<InputObjectKind>,
objects: Vec<Option<Object>>,
) -> Result<InputObjects, SuiError> {
// Constructing the list of objects that could be used to authenticate other
// objects. Any mutable object (either shared or owned) can be used to
// authenticate other objects. Hence essentially we are building the list
Expand Down
43 changes: 26 additions & 17 deletions crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,6 @@ async fn test_handle_certificate_interrupted_retry() {
telemetry_subscribers::init_for_testing();

let (sender, sender_key): (_, AccountKeyPair) = get_key_pair();
let recipient = dbg_addr(2);
let gas_object_id = ObjectID::random();

// We repeatedly timeout certs after a variety of delays, using LimitedPoll to ensure that we
Expand All @@ -1098,30 +1097,40 @@ async fn test_handle_certificate_interrupted_retry() {

let authority_state = Arc::new(init_state_with_ids(objects.clone()).await);

let shared_object_id = ObjectID::random();
let shared_object = {
use sui_types::gas_coin::GasCoin;
use sui_types::object::MoveObject;

let content = GasCoin::new(shared_object_id, 10);
let obj = MoveObject::new_gas_coin(OBJECT_START_VERSION, content.to_bcs_bytes());
Object::new_move(obj, Owner::Shared, TransactionDigest::genesis())
};

authority_state.insert_genesis_object(shared_object).await;

let mut interrupted_count = 0;
for (limit, (_, object_id)) in delays.iter().zip(objects) {
for (limit, _) in delays.iter().zip(objects) {
info!("Testing with poll limit {}", limit);
let object = authority_state
.get_object(&object_id)
.await
.unwrap()
.unwrap();
let gas_object = authority_state
.get_object(&gas_object_id)
.await
.unwrap()
.unwrap();

let certified_transfer_transaction = init_certified_transfer_transaction(
sender,
let shared_object_cert = make_test_transaction(
&sender,
&sender_key,
recipient,
object.compute_object_reference(),
gas_object.compute_object_reference(),
&authority_state,
);
shared_object_id,
&gas_object.compute_object_reference(),
&[&authority_state],
16,
)
.await;

send_consensus(&authority_state, &shared_object_cert).await;

let clone1 = certified_transfer_transaction.clone();
let clone1 = shared_object_cert.clone();
let state1 = authority_state.clone();

let limited_fut = Box::pin(LimitedPoll::new(*limit, async move {
Expand All @@ -1137,7 +1146,7 @@ async fn test_handle_certificate_interrupted_retry() {

let g = authority_state
.database
.acquire_tx_guard(&certified_transfer_transaction)
.acquire_tx_guard(&shared_object_cert)
.await
.unwrap();

Expand All @@ -1147,7 +1156,7 @@ async fn test_handle_certificate_interrupted_retry() {

// Now run the tx to completion
let info = authority_state
.handle_certificate(certified_transfer_transaction.clone())
.handle_certificate(shared_object_cert.clone())
.await
.unwrap();

Expand Down
7 changes: 7 additions & 0 deletions crates/sui-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ impl TransactionKind {
)
}

pub fn is_change_epoch_tx(&self) -> bool {
matches!(
self,
TransactionKind::Single(SingleTransactionKind::ChangeEpoch(_))
)
}

pub fn validity_check(&self) -> SuiResult {
match self {
Self::Batch(b) => {
Expand Down

0 comments on commit e64720f

Please sign in to comment.