Skip to content

Commit

Permalink
jsonrpc: use ExecuteTransactionRequestV3 when executing a transaction
Browse files Browse the repository at this point in the history
This patch deprecates the `WaitForLocalExecution` request type that was
previously relied upon to calculate object and balance changes. Instead
the new `ExecuteTransactionRequestV3` functionality is used to request
output object data directly from the validators so that we no longer
need to rely on waiting for local execution.

The `WaitForLocalExecution` feature still exists and will still properly
wait to return a response to a client until after the transaction is
executed locally in order to retain the Read-after-Write semantics that
some clients may presently rely on.
  • Loading branch information
bmwill authored Jul 31, 2024
1 parent be249a0 commit c9a6d67
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 218 deletions.
200 changes: 74 additions & 126 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use sui_types::effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffe
use sui_types::error::{SuiError, SuiResult};
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::quorum_driver_types::{
ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionRequestV3,
ExecuteTransactionResponse, ExecuteTransactionResponseV3, FinalizedEffects,
QuorumDriverEffectsQueueResult, QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
ExecuteTransactionRequestType, ExecuteTransactionRequestV3, ExecuteTransactionResponseV3,
FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
QuorumDriverError, QuorumDriverResponse, QuorumDriverResult,
};
use sui_types::sui_system_state::SuiSystemState;
use sui_types::transaction::VerifiedTransaction;
Expand Down Expand Up @@ -148,125 +148,60 @@ where
#[instrument(name = "tx_orchestrator_execute_transaction", level = "debug", skip_all,
fields(
tx_digest = ?request.transaction.digest(),
tx_type = ?request.transaction_type(),
tx_type = ?request_type,
),
err)]
pub async fn execute_transaction_block(
&self,
request: ExecuteTransactionRequest,
request: ExecuteTransactionRequestV3,
request_type: ExecuteTransactionRequestType,
client_addr: Option<SocketAddr>,
) -> Result<ExecuteTransactionResponse, QuorumDriverError> {
// TODO check if tx is already executed on this node.
// Note: since EffectsCert is not stored today, we need to gather that from validators
// (and maybe store it for caching purposes)
) -> Result<(ExecuteTransactionResponseV3, IsTransactionExecutedLocally), QuorumDriverError>
{
let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();

let transaction = epoch_store
.verify_transaction(request.transaction)
.map_err(QuorumDriverError::InvalidUserSignature)?;
let (_in_flight_metrics_guards, good_response_metrics) = self.update_metrics(&transaction);
let tx_digest = *transaction.digest();
debug!(?tx_digest, "TO Received transaction execution request.");
let (transaction, response) = self
.execute_transaction_impl(&epoch_store, request, client_addr)
.await?;

let (_e2e_latency_timer, _txn_finality_timer) = if transaction.contains_shared_object() {
(
self.metrics.request_latency_shared_obj.start_timer(),
self.metrics
.wait_for_finality_latency_shared_obj
.start_timer(),
let executed_locally = if matches!(
request_type,
ExecuteTransactionRequestType::WaitForLocalExecution
) {
let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
transaction,
response.effects_cert.executed_epoch(),
);
Self::execute_finalized_tx_locally_with_timeout(
&self.validator_state,
&epoch_store,
&executable_tx,
&response.effects_cert,
&self.metrics,
)
.await
.is_ok()
} else {
(
self.metrics.request_latency_single_writer.start_timer(),
self.metrics
.wait_for_finality_latency_single_writer
.start_timer(),
)
false
};

// TODO: refactor all the gauge and timer metrics with `monitored_scope`
let wait_for_finality_gauge = self.metrics.wait_for_finality_in_flight.clone();
wait_for_finality_gauge.inc();
let _wait_for_finality_gauge = scopeguard::guard(wait_for_finality_gauge, |in_flight| {
in_flight.dec();
});

let ticket = self
.submit(
transaction.clone(),
ExecuteTransactionRequestV3::new_v2(transaction.clone()),
client_addr,
)
.await
.map_err(|e| {
warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
QuorumDriverError::QuorumDriverInternalError(e)
})?;

let wait_for_local_execution = matches!(
request.request_type,
ExecuteTransactionRequestType::WaitForLocalExecution
);

let Ok(result) = timeout(WAIT_FOR_FINALITY_TIMEOUT, ticket).await else {
debug!(?tx_digest, "Timeout waiting for transaction finality.");
self.metrics.wait_for_finality_timeout.inc();
return Err(QuorumDriverError::TimeoutBeforeFinality);
let QuorumDriverResponse {
effects_cert,
events,
input_objects,
output_objects,
auxiliary_data,
} = response;

let response = ExecuteTransactionResponseV3 {
effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
events,
input_objects,
output_objects,
auxiliary_data,
};

drop(_txn_finality_timer);
drop(_wait_for_finality_gauge);
self.metrics.wait_for_finality_finished.inc();

match result {
Err(err) => {
warn!(?tx_digest, "QuorumDriverInternalError: {err:?}");
Err(QuorumDriverError::QuorumDriverInternalError(err))
}
Ok(Err(err)) => Err(err),
Ok(Ok(response)) => {
good_response_metrics.inc();
let QuorumDriverResponse { effects_cert, .. } = response;
if !wait_for_local_execution {
debug!(?tx_digest, ?wait_for_local_execution, "success");
return Ok(ExecuteTransactionResponse::EffectsCert(Box::new((
FinalizedEffects::new_from_effects_cert(effects_cert.into()),
response.events.unwrap_or_default(),
false,
))));
}

let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
transaction,
effects_cert.executed_epoch(),
);

match Self::execute_finalized_tx_locally_with_timeout(
&self.validator_state,
&epoch_store,
&executable_tx,
&effects_cert,
&self.metrics,
)
.await
{
Ok(_) => {
debug!(?tx_digest, ?wait_for_local_execution, "success");

Ok(ExecuteTransactionResponse::EffectsCert(Box::new((
FinalizedEffects::new_from_effects_cert(effects_cert.into()),
response.events.unwrap_or_default(),
true,
))))
}
Err(_) => Ok(ExecuteTransactionResponse::EffectsCert(Box::new((
FinalizedEffects::new_from_effects_cert(effects_cert.into()),
response.events.unwrap_or_default(),
false,
)))),
}
}
}
Ok((response, executed_locally))
}

// Utilize the handle_certificate_v3 validator api to request input/output objects
Expand All @@ -277,11 +212,37 @@ where
request: ExecuteTransactionRequestV3,
client_addr: Option<SocketAddr>,
) -> Result<ExecuteTransactionResponseV3, QuorumDriverError> {
// TODO check if tx is already executed on this node.
// Note: since EffectsCert is not stored today, we need to gather that from validators
// (and maybe store it for caching purposes)
let epoch_store = self.validator_state.load_epoch_store_one_call_per_task();

let QuorumDriverResponse {
effects_cert,
events,
input_objects,
output_objects,
auxiliary_data,
} = self
.execute_transaction_impl(&epoch_store, request, client_addr)
.await
.map(|(_, r)| r)?;

Ok(ExecuteTransactionResponseV3 {
effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
events,
input_objects,
output_objects,
auxiliary_data,
})
}

// TODO check if tx is already executed on this node.
// Note: since EffectsCert is not stored today, we need to gather that from validators
// (and maybe store it for caching purposes)
pub async fn execute_transaction_impl(
&self,
epoch_store: &AuthorityPerEpochStore,
request: ExecuteTransactionRequestV3,
client_addr: Option<SocketAddr>,
) -> Result<(VerifiedTransaction, QuorumDriverResponse), QuorumDriverError> {
let transaction = epoch_store
.verify_transaction(request.transaction.clone())
.map_err(QuorumDriverError::InvalidUserSignature)?;
Expand Down Expand Up @@ -313,7 +274,7 @@ where
});

let ticket = self
.submit(transaction, request, client_addr)
.submit(transaction.clone(), request, client_addr)
.await
.map_err(|e| {
warn!(?tx_digest, "QuorumDriverInternalError: {e:?}");
Expand All @@ -338,20 +299,7 @@ where
Ok(Err(err)) => Err(err),
Ok(Ok(response)) => {
good_response_metrics.inc();
let QuorumDriverResponse {
effects_cert,
events,
input_objects,
output_objects,
auxiliary_data,
} = response;
Ok(ExecuteTransactionResponseV3 {
effects: FinalizedEffects::new_from_effects_cert(effects_cert.into()),
events,
input_objects,
output_objects,
auxiliary_data,
})
Ok((transaction, response))
}
}
}
Expand Down
47 changes: 25 additions & 22 deletions crates/sui-e2e-tests/tests/full_node_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use sui_types::messages_grpc::TransactionInfoRequest;
use sui_types::object::{Object, ObjectRead, Owner, PastObjectRead};
use sui_types::programmable_transaction_builder::ProgrammableTransactionBuilder;
use sui_types::quorum_driver_types::{
ExecuteTransactionRequest, ExecuteTransactionRequestType, ExecuteTransactionResponse,
QuorumDriverResponse,
ExecuteTransactionRequestType, ExecuteTransactionRequestV3, QuorumDriverResponse,
};
use sui_types::storage::ObjectStore;
use sui_types::transaction::{
Expand Down Expand Up @@ -754,16 +753,13 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E
let digest = *txn.digest();
let res = transaction_orchestrator
.execute_transaction_block(
ExecuteTransactionRequest {
transaction: txn,
request_type: ExecuteTransactionRequestType::WaitForLocalExecution,
},
ExecuteTransactionRequestV3::new_v2(txn),
ExecuteTransactionRequestType::WaitForLocalExecution,
None,
)
.await
.unwrap_or_else(|e| panic!("Failed to execute transaction {:?}: {:?}", digest, e));

let ExecuteTransactionResponse::EffectsCert(res) = res;
let (
tx,
QuorumDriverResponse {
Expand All @@ -772,11 +768,17 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E
..
},
) = rx.recv().await.unwrap().unwrap();
let (cte, events, is_executed_locally) = *res;
let (response, is_executed_locally) = res;
assert_eq!(*tx.digest(), digest);
assert_eq!(cte.effects.digest(), *certified_txn_effects.digest());
assert_eq!(
response.effects.effects.digest(),
*certified_txn_effects.digest()
);
assert!(is_executed_locally);
assert_eq!(events.digest(), txn_events.unwrap_or_default().digest());
assert_eq!(
response.events.unwrap_or_default().digest(),
txn_events.unwrap_or_default().digest()
);
// verify that the node has sequenced and executed the txn
fullnode.state().get_executed_transaction_and_effects(digest, kv_store.clone()).await
.unwrap_or_else(|e| panic!("Fullnode does not know about the txn {:?} that was executed with WaitForLocalExecution: {:?}", digest, e));
Expand All @@ -786,16 +788,13 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E
let digest = *txn.digest();
let res = transaction_orchestrator
.execute_transaction_block(
ExecuteTransactionRequest {
transaction: txn,
request_type: ExecuteTransactionRequestType::WaitForEffectsCert,
},
ExecuteTransactionRequestV3::new_v2(txn),
ExecuteTransactionRequestType::WaitForEffectsCert,
None,
)
.await
.unwrap_or_else(|e| panic!("Failed to execute transaction {:?}: {:?}", digest, e));

let ExecuteTransactionResponse::EffectsCert(res) = res;
let (
tx,
QuorumDriverResponse {
Expand All @@ -804,10 +803,16 @@ async fn test_full_node_transaction_orchestrator_basic() -> Result<(), anyhow::E
..
},
) = rx.recv().await.unwrap().unwrap();
let (cte, events, is_executed_locally) = *res;
let (response, is_executed_locally) = res;
assert_eq!(*tx.digest(), digest);
assert_eq!(cte.effects.digest(), *certified_txn_effects.digest());
assert_eq!(txn_events.unwrap_or_default().digest(), events.digest());
assert_eq!(
response.effects.effects.digest(),
*certified_txn_effects.digest()
);
assert_eq!(
txn_events.unwrap_or_default().digest(),
response.events.unwrap_or_default().digest()
);
assert!(!is_executed_locally);
fullnode
.state()
Expand Down Expand Up @@ -1189,10 +1194,8 @@ async fn test_pass_back_no_object() -> Result<(), anyhow::Error> {
let digest = *tx.digest();
let _res = transaction_orchestrator
.execute_transaction_block(
ExecuteTransactionRequest {
transaction: tx,
request_type: ExecuteTransactionRequestType::WaitForLocalExecution,
},
ExecuteTransactionRequestV3::new_v2(tx),
ExecuteTransactionRequestType::WaitForLocalExecution,
None,
)
.await
Expand Down
Loading

0 comments on commit c9a6d67

Please sign in to comment.