Skip to content

Commit

Permalink
Some simple refactorings to clean up the commit cert path (#14758)
Browse files Browse the repository at this point in the history
- remove dead code
- Move output key computation into InnerTemporaryStore method
- Combine commit_cert_and_notify with commit_certificate
  • Loading branch information
mystenmark authored Nov 10, 2023
1 parent 9c25bde commit 12b10da
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 212 deletions.
224 changes: 62 additions & 162 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ use typed_store::Map;

use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertTxGuard};
use crate::authority::authority_per_epoch_store_pruner::AuthorityPerEpochStorePruner;
use crate::authority::authority_store::{ExecutionLockReadGuard, InputKey, ObjectLockStatus};
use crate::authority::authority_store::{ExecutionLockReadGuard, ObjectLockStatus};
use crate::authority::authority_store_pruner::AuthorityStorePruner;
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
Expand Down Expand Up @@ -229,8 +229,6 @@ pub struct AuthorityMetrics {
post_processing_total_tx_had_event_processed: IntCounter,
post_processing_total_failures: IntCounter,

pending_notify_read: IntGauge,

/// Consensus handler metrics
pub consensus_handler_processed_bytes: IntCounter,
pub consensus_handler_processed: IntCounterVec,
Expand Down Expand Up @@ -526,12 +524,6 @@ impl AuthorityMetrics {
registry,
)
.unwrap(),
pending_notify_read: register_int_gauge_with_registry!(
"pending_notify_read",
"Pending notify read requests",
registry,
)
.unwrap(),
consensus_handler_processed_bytes: register_int_counter_with_registry!(
"consensus_handler_processed_bytes",
"Number of bytes processed by consensus_handler",
Expand Down Expand Up @@ -1166,7 +1158,7 @@ impl AuthorityState {

fail_point_async!("crash");

self.commit_cert_and_notify(
self.commit_certificate(
certificate,
inner_temporary_store,
&effects,
Expand Down Expand Up @@ -1212,7 +1204,7 @@ impl AuthorityState {
Ok((effects, execution_error_opt))
}

async fn commit_cert_and_notify(
async fn commit_certificate(
&self,
certificate: &VerifiedExecutableTransaction,
inner_temporary_store: InnerTemporaryStore,
Expand All @@ -1222,63 +1214,56 @@ impl AuthorityState {
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let _scope: Option<mysten_metrics::MonitoredScopeGuard> =
monitored_scope("Execution::commit_cert_and_notify");
monitored_scope("Execution::commit_certificate");
let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();

let tx_digest = certificate.digest();
let input_object_count = inner_temporary_store.input_objects.len();
let shared_object_count = effects.input_shared_objects().len();
let digest = *certificate.digest();

// If commit_certificate returns an error, tx_guard will be dropped and the certificate
// will be persisted in the log for later recovery.
let mut output_keys: Vec<_> = inner_temporary_store
.written
.iter()
.map(|(id, obj)| {
if obj.is_package() {
InputKey::Package { id: *id }
} else {
InputKey::VersionedObject {
id: *id,
version: obj.version(),
}
}
})
.collect();
let output_keys = inner_temporary_store.get_output_keys(effects);

let deleted: HashMap<_, _> = effects
.deleted()
.iter()
.map(|oref| (oref.0, oref.1))
.collect();
// Only need to sign effects if we are a validator.
let effects_sig = if self.is_validator(epoch_store) {
Some(AuthoritySignInfo::new(
epoch_store.epoch(),
effects,
Intent::sui_app(IntentScope::TransactionEffects),
self.name,
&*self.secret,
))
} else {
None
};

// add deleted shared objects to the outputkeys that then get sent to notify_commit
let deleted_output_keys = deleted
.iter()
.filter(|(id, _)| {
inner_temporary_store
.input_objects
.get(id)
.is_some_and(|obj| obj.is_shared())
})
.map(|(id, seq)| InputKey::VersionedObject {
id: *id,
version: *seq,
// index certificate
let _ = self
.post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
.await
.tap_err(|e| {
self.metrics.post_processing_total_failures.inc();
error!(?tx_digest, "tx post processing failed: {e}");
});
output_keys.extend(deleted_output_keys);

// For any previously deleted shared objects that appeared mutably in the transaction,
// synthesize a notification for the next version of the object.
let smeared_version = inner_temporary_store.lamport_version;
let deleted_accessed_objects = effects.deleted_mutably_accessed_shared_objects();
for object_id in deleted_accessed_objects.into_iter() {
let key = InputKey::VersionedObject {
id: object_id,
version: smeared_version,
};
output_keys.push(key);
}

self.commit_certificate(inner_temporary_store, certificate, effects, epoch_store)
// The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
// we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
epoch_store.insert_tx_cert_and_effects_signature(
tx_digest,
certificate.certificate_sig(),
effects_sig.as_ref(),
)?;

// Allow testing what happens if we crash here.
fail_point_async!("crash");

self.database
.update_state(
inner_temporary_store,
&certificate.clone().into_unsigned(),
effects,
epoch_store.epoch(),
)
.await?;

// commit_certificate finished, the tx is fully committed to the store.
Expand All @@ -1287,14 +1272,27 @@ impl AuthorityState {
// Notifies transaction manager about transaction and output objects committed.
// This provides necessary information to transaction manager to start executing
// additional ready transactions.
//
// REQUIRED: this must be called after commit_certificate() (above), which writes output
// objects into storage. Otherwise, the transaction manager may schedule a transaction
// before the output objects are actually available.
self.transaction_manager
.notify_commit(&digest, output_keys, epoch_store);

// Update metrics.
self.update_metrics(certificate, input_object_count, shared_object_count);

Ok(())
}

fn update_metrics(
&self,
certificate: &VerifiedExecutableTransaction,
input_object_count: usize,
shared_object_count: usize,
) {
// count signature by scheme, for zklogin and multisig
if certificate.has_zklogin_sig() {
self.metrics.zklogin_sig_count.inc();
} else if certificate.has_upgraded_multisig() {
self.metrics.multisig_sig_count.inc();
}

self.metrics.total_effects.inc();
self.metrics.total_certs.inc();

Expand All @@ -1320,8 +1318,6 @@ impl AuthorityState {
.kind()
.num_commands() as f64,
);

Ok(())
}

/// prepare_certificate validates the transaction input, and executes the certificate,
Expand Down Expand Up @@ -3419,27 +3415,6 @@ impl AuthorityState {
.await;
}

#[instrument(level = "trace", skip_all)]
pub fn get_certified_transaction(
&self,
tx_digest: &TransactionDigest,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult<Option<VerifiedCertificate>> {
let Some(cert_sig) = epoch_store.get_transaction_cert_sig(tx_digest)? else {
return Ok(None);
};
let Some(transaction) = self.database.get_transaction_block(tx_digest)? else {
return Ok(None);
};

Ok(Some(VerifiedCertificate::new_unchecked(
CertifiedTransaction::new_from_data_and_sig(
transaction.into_inner().into_data(),
cert_sig,
),
)))
}

/// Make a status response for a transaction
#[instrument(level = "trace", skip_all)]
pub fn get_transaction_status(
Expand Down Expand Up @@ -3628,81 +3603,6 @@ impl AuthorityState {
Some((input_coin_objects, written_coin_objects))
}

/// Commit effects of transaction execution to data store.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn commit_certificate(
&self,
inner_temporary_store: InnerTemporaryStore,
certificate: &VerifiedExecutableTransaction,
effects: &TransactionEffects,
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult {
let _metrics_guard = self.metrics.commit_certificate_latency.start_timer();

let tx_digest = certificate.digest();
// Only need to sign effects if we are a validator.
let effects_sig = if self.is_validator(epoch_store) {
Some(AuthoritySignInfo::new(
epoch_store.epoch(),
effects,
Intent::sui_app(IntentScope::TransactionEffects),
self.name,
&*self.secret,
))
} else {
None
};

// index certificate
let _ = self
.post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
.await
.tap_err(|e| {
self.metrics.post_processing_total_failures.inc();
error!(?tx_digest, "tx post processing failed: {e}");
});

// The insertion to epoch_store is not atomic with the insertion to the perpetual store. This is OK because
// we insert to the epoch store first. And during lookups we always look up in the perpetual store first.
epoch_store.insert_tx_cert_and_effects_signature(
tx_digest,
certificate.certificate_sig(),
effects_sig.as_ref(),
)?;

// Allow testing what happens if we crash here.
fail_point_async!("crash");

self.database
.update_state(
inner_temporary_store,
&certificate.clone().into_unsigned(),
effects,
epoch_store.epoch(),
)
.await
.tap_ok(|_| {
debug!(
effects_digest = ?effects.digest(),
"commit_certificate finished"
);
})?;

// todo - ideally move this metric in NotifyRead once we have metrics in AuthorityStore
self.metrics
.pending_notify_read
.set(self.database.executed_effects_notify_read.num_pending() as i64);

// count signature by scheme, for zklogin and multisig
if certificate.has_zklogin_sig() {
self.metrics.zklogin_sig_count.inc();
} else if certificate.has_upgraded_multisig() {
self.metrics.multisig_sig_count.inc();
}

Ok(())
}

/// Get the TransactionEnvelope that currently locks the given object, if any.
/// Since object locks are only valid for one epoch, we also need the epoch_id in the query.
/// Returns UserInputError::ObjectNotFound if no lock records for the given object can be found.
Expand Down
Loading

2 comments on commit 12b10da

@vercel
Copy link

@vercel vercel bot commented on 12b10da Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vercel
Copy link

@vercel vercel bot commented on 12b10da Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.