Skip to content

Add transaction scheduler #22009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: xun/add-wait-functions-cache
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 16 additions & 109 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::congestion_tracker::CongestionTracker;
use crate::consensus_adapter::ConsensusOverloadChecker;
use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::execution_scheduler::ExecutionScheduler;
use crate::jsonrpc_index::CoinIndexKey2;
use crate::rpc_index::RpcIndexStore;
use crate::transaction_outputs::TransactionOutputs;
Expand Down Expand Up @@ -164,7 +165,6 @@ use crate::stake_aggregator::StakeAggregator;
use crate::state_accumulator::{AccumulatorStore, StateAccumulator, WrappedObject};
use crate::subscription_handler::SubscriptionHandler;
use crate::transaction_input_loader::TransactionInputLoader;
use crate::transaction_manager::TransactionManager;

#[cfg(msim)]
pub use crate::checkpoints::checkpoint_executor::utils::{
Expand Down Expand Up @@ -256,19 +256,8 @@ pub struct AuthorityMetrics {
commit_certificate_latency: Histogram,
db_checkpoint_latency: Histogram,

pub(crate) transaction_manager_num_enqueued_certificates: IntCounterVec,
pub(crate) transaction_manager_num_missing_objects: IntGauge,
// TODO: Rename these metrics.
pub(crate) transaction_manager_num_pending_certificates: IntGauge,
pub(crate) transaction_manager_num_executing_certificates: IntGauge,
pub(crate) transaction_manager_num_ready: IntGauge,
pub(crate) transaction_manager_object_cache_size: IntGauge,
pub(crate) transaction_manager_object_cache_hits: IntCounter,
pub(crate) transaction_manager_object_cache_misses: IntCounter,
pub(crate) transaction_manager_object_cache_evictions: IntCounter,
pub(crate) transaction_manager_package_cache_size: IntGauge,
pub(crate) transaction_manager_package_cache_hits: IntCounter,
pub(crate) transaction_manager_package_cache_misses: IntCounter,
pub(crate) transaction_manager_package_cache_evictions: IntCounter,
pub(crate) transaction_manager_transaction_queue_age_s: Histogram,

pub(crate) execution_driver_executed_transactions: IntCounter,
Expand Down Expand Up @@ -510,49 +499,12 @@ impl AuthorityMetrics {
LATENCY_SEC_BUCKETS.to_vec(),
registry,
).unwrap(),
transaction_manager_num_enqueued_certificates: register_int_counter_vec_with_registry!(
"transaction_manager_num_enqueued_certificates",
"Current number of certificates enqueued to TransactionManager",
&["result"],
registry,
)
.unwrap(),
transaction_manager_num_missing_objects: register_int_gauge_with_registry!(
"transaction_manager_num_missing_objects",
"Current number of missing objects in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_num_pending_certificates: register_int_gauge_with_registry!(
"transaction_manager_num_pending_certificates",
"Number of certificates pending in TransactionManager, with at least 1 missing input object",
registry,
)
.unwrap(),
transaction_manager_num_executing_certificates: register_int_gauge_with_registry!(
"transaction_manager_num_executing_certificates",
"Number of executing certificates, including queued and actually running certificates",
registry,
)
.unwrap(),
transaction_manager_num_ready: register_int_gauge_with_registry!(
"transaction_manager_num_ready",
"Number of ready transactions in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_object_cache_size: register_int_gauge_with_registry!(
"transaction_manager_object_cache_size",
"Current size of object-availability cache in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_object_cache_hits: register_int_counter_with_registry!(
"transaction_manager_object_cache_hits",
"Number of object-availability cache hits in TransactionManager",
registry,
)
.unwrap(),
authority_overload_status: register_int_gauge_with_registry!(
"authority_overload_status",
"Whether authority is current experiencing overload and enters load shedding mode.",
Expand All @@ -563,42 +515,6 @@ impl AuthorityMetrics {
"The percentage of transactions is shed when the authority is in load shedding mode.",
registry)
.unwrap(),
transaction_manager_object_cache_misses: register_int_counter_with_registry!(
"transaction_manager_object_cache_misses",
"Number of object-availability cache misses in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_object_cache_evictions: register_int_counter_with_registry!(
"transaction_manager_object_cache_evictions",
"Number of object-availability cache evictions in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_size: register_int_gauge_with_registry!(
"transaction_manager_package_cache_size",
"Current size of package-availability cache in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_hits: register_int_counter_with_registry!(
"transaction_manager_package_cache_hits",
"Number of package-availability cache hits in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_misses: register_int_counter_with_registry!(
"transaction_manager_package_cache_misses",
"Number of package-availability cache misses in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_package_cache_evictions: register_int_counter_with_registry!(
"transaction_manager_package_cache_evictions",
"Number of package-availability cache evictions in TransactionManager",
registry,
)
.unwrap(),
transaction_manager_transaction_queue_age_s: register_histogram_with_registry!(
"transaction_manager_transaction_queue_age_s",
"Time spent in waiting for transaction in the queue",
Expand Down Expand Up @@ -832,8 +748,8 @@ pub struct AuthorityState {

committee_store: Arc<CommitteeStore>,

/// Manages pending certificates and their missing input objects.
transaction_manager: Arc<TransactionManager>,
/// Schedules transaction execution.
execution_scheduler: Arc<ExecutionScheduler>,

/// Shuts down the execution task. Used only in testing.
#[allow(unused)]
Expand Down Expand Up @@ -1117,7 +1033,7 @@ impl AuthorityState {
self.update_overload_metrics("execution_queue");
})?;
}
self.transaction_manager
self.execution_scheduler
.check_execution_overload(self.overload_config(), tx_data)
.tap_err(|_| {
self.update_overload_metrics("execution_pending");
Expand Down Expand Up @@ -1561,8 +1477,6 @@ impl AuthorityState {
let input_object_count = inner_temporary_store.input_objects.len();
let shared_object_count = effects.input_shared_objects().len();

let output_keys = inner_temporary_store.get_output_keys(effects);

// index certificate
let _ = self
.post_process_one_tx(certificate, effects, &inner_temporary_store, epoch_store)
Expand Down Expand Up @@ -1596,11 +1510,7 @@ impl AuthorityState {
// commit_certificate finished, the tx is fully committed to the store.
tx_guard.commit_tx();

// Notifies transaction manager about transaction and output objects committed.
// This provides necessary information to transaction manager to start executing
// additional ready transactions.
self.transaction_manager
.notify_commit(tx_digest, output_keys, epoch_store);
self.execution_scheduler.notify_commit(certificate);

self.update_metrics(certificate, input_object_count, shared_object_count);

Expand Down Expand Up @@ -2922,12 +2832,11 @@ impl AuthorityState {

let metrics = Arc::new(AuthorityMetrics::new(prometheus_registry));
let (tx_ready_certificates, rx_ready_certificates) = unbounded_channel();
let transaction_manager = Arc::new(TransactionManager::new(
let execution_scheduler = Arc::new(ExecutionScheduler::new(
execution_cache_trait_pointers.object_cache_reader.clone(),
execution_cache_trait_pointers
.transaction_cache_reader
.clone(),
&epoch_store,
tx_ready_certificates,
metrics.clone(),
));
Expand Down Expand Up @@ -2964,7 +2873,7 @@ impl AuthorityState {
subscription_handler: Arc::new(SubscriptionHandler::new(prometheus_registry)),
checkpoint_store,
committee_store,
transaction_manager,
execution_scheduler,
tx_execution_shutdown: Mutex::new(Some(tx_execution_shutdown)),
metrics,
_pruner,
Expand Down Expand Up @@ -3067,8 +2976,8 @@ impl AuthorityState {
.await
}

pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
&self.transaction_manager
pub fn execution_scheduler(&self) -> &Arc<ExecutionScheduler> {
&self.execution_scheduler
}

/// Adds transactions / certificates to transaction manager for ordered execution.
Expand All @@ -3077,23 +2986,24 @@ impl AuthorityState {
txns: Vec<VerifiedExecutableTransaction>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
self.transaction_manager.enqueue(txns, epoch_store)
self.execution_scheduler.enqueue(txns, epoch_store)
}

pub fn enqueue_certificates_for_execution(
&self,
certs: Vec<VerifiedCertificate>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
self.transaction_manager
self.execution_scheduler
.enqueue_certificates(certs, epoch_store)
}

pub fn enqueue_with_expected_effects_digest(
&self,
certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
epoch_store: &AuthorityPerEpochStore,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
self.transaction_manager
self.execution_scheduler
.enqueue_with_expected_effects_digest(certs, epoch_store)
}

Expand Down Expand Up @@ -3281,7 +3191,6 @@ impl AuthorityState {
)
.await?;
assert_eq!(new_epoch_store.epoch(), new_epoch);
self.transaction_manager.reconfigure(new_epoch);
*execution_lock = new_epoch;
// drop execution_lock after epoch store was updated
// see also assert in AuthorityState::process_certificate
Expand Down Expand Up @@ -3315,7 +3224,6 @@ impl AuthorityState {
.unwrap_or_default(),
);
let new_epoch = new_epoch_store.epoch();
self.transaction_manager.reconfigure(new_epoch);
self.epoch_store.store(new_epoch_store);
epoch_store.epoch_terminated().await;
*execution_lock = new_epoch;
Expand Down Expand Up @@ -5467,9 +5375,8 @@ impl RandomnessRoundReceiver {
.get_cache_commit()
.persist_transaction(&transaction);

// Send transaction to TransactionManager for execution.
self.authority_state
.transaction_manager()
.execution_scheduler()
.enqueue(vec![transaction], &epoch_store);

let authority_state = self.authority_state.clone();
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ pub async fn send_consensus(authority: &AuthorityState, cert: &VerifiedCertifica
.unwrap();

authority
.transaction_manager()
.execution_scheduler()
.enqueue(certs, &authority.epoch_store_for_testing());
}

Expand Down
17 changes: 9 additions & 8 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use tracing::{debug, info, instrument, warn};
use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::authority::backpressure::BackpressureManager;
use crate::authority::AuthorityState;
use crate::execution_scheduler::ExecutionScheduler;
use crate::state_accumulator::StateAccumulator;
use crate::transaction_manager::TransactionManager;
use crate::{
checkpoints::CheckpointStore,
execution_cache::{ObjectCacheRead, TransactionCacheRead},
Expand Down Expand Up @@ -120,7 +120,7 @@ pub struct CheckpointExecutor {
checkpoint_store: Arc<CheckpointStore>,
object_cache_reader: Arc<dyn ObjectCacheRead>,
transaction_cache_reader: Arc<dyn TransactionCacheRead>,
tx_manager: Arc<TransactionManager>,
execution_scheduler: Arc<ExecutionScheduler>,
accumulator: Arc<StateAccumulator>,
backpressure_manager: Arc<BackpressureManager>,
config: CheckpointExecutorConfig,
Expand All @@ -146,7 +146,7 @@ impl CheckpointExecutor {
checkpoint_store,
object_cache_reader: state.get_object_cache_reader().clone(),
transaction_cache_reader: state.get_transaction_cache_reader().clone(),
tx_manager: state.transaction_manager().clone(),
execution_scheduler: state.execution_scheduler().clone(),
accumulator,
backpressure_manager,
config,
Expand Down Expand Up @@ -802,7 +802,7 @@ impl CheckpointExecutor {
}

// Enqueue unexecuted transactions with their expected effects digests
self.tx_manager
self.execution_scheduler
.enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);

unexecuted_tx_digests
Expand Down Expand Up @@ -852,10 +852,11 @@ impl CheckpointExecutor {
change_epoch_tx.digest(),
change_epoch_fx.digest()
);
self.tx_manager.enqueue_with_expected_effects_digest(
vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
&self.epoch_store,
);
self.execution_scheduler
.enqueue_with_expected_effects_digest(
vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
&self.epoch_store,
);

self.transaction_cache_reader
.notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ impl CheckpointStore {
pub async fn reexecute_local_checkpoints(
&self,
state: &AuthorityState,
epoch_store: &AuthorityPerEpochStore,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
let epoch = epoch_store.epoch();
let highest_executed = self
Expand Down
Loading
Loading