Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add QuorumDriver #2515

Merged
merged 9 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"crates/sui-node",
"crates/sui-open-rpc",
"crates/sui-open-rpc-macros",
"crates/sui-quorum-driver",
"crates/sui-storage",
"crates/sui-swarm",
"crates/sui-transactional-test-runner",
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ where
/// At that point (and after) enough authorities are up to date with all objects
/// needed to process the certificate that a submission should succeed. However,
/// in case an authority returns an error, we do try to bring it up to speed.
async fn process_certificate(
pub async fn process_certificate(
&self,
certificate: CertifiedTransaction,
) -> Result<TransactionEffects, SuiError> {
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/tests/staged/sui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,10 @@ SuiError:
UnsupportedFeatureError:
STRUCT:
- error: STR
115:
QuorumDriverCommunicationError:
STRUCT:
- error: STR
TransactionDigest:
NEWTYPESTRUCT: BYTES
TransactionEffectsDigest:
Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ sui-core = { path = "../sui-core" }
sui-storage = { path = "../sui-storage" }
sui-gateway = { path = "../sui-gateway" }
sui-network = { path = "../sui-network" }
sui-quorum-driver = { path = "../sui-quorum-driver" }

telemetry-subscribers = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" }
mysten-network = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223" }
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-quorum-driver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "sui-quorum-driver"
version = "0.1.0"
authors = ["Mysten Labs <build@mystenlabs.com>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
arc-swap = "1.5.0"
tokio = { version = "1.18.2", features = ["full"] }
tracing = "0.1.34"

sui-core = { path = "../sui-core" }
sui-types = { path = "../sui-types" }

workspace-hack = { path = "../workspace-hack"}
224 changes: 224 additions & 0 deletions crates/sui-quorum-driver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use arc_swap::ArcSwap;
use std::sync::Arc;

use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinHandle;
use tracing::log::{error, warn};
use tracing::Instrument;

use sui_core::authority_aggregator::AuthorityAggregator;
use sui_core::authority_client::AuthorityAPI;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{
CertifiedTransaction, ExecuteTransactionRequest, ExecuteTransactionRequestType,
ExecuteTransactionResponse, Transaction, TransactionEffects,
};

pub enum QuorumTask<A> {
ProcessTransaction(Transaction),
ProcessCertificate(CertifiedTransaction),
UpdateCommittee(AuthorityAggregator<A>),
}

/// A handler to wrap around QuorumDriver. This handler should be owned by the node with exclusive
/// mutability.
pub struct QuorumDriverHandler<A> {
quorum_driver: Arc<QuorumDriver<A>>,
_processor_handle: JoinHandle<()>,
// TODO: Change to CertifiedTransactionEffects eventually.
effects_subscriber: Receiver<(CertifiedTransaction, TransactionEffects)>,
}

/// The core data structure of the QuorumDriver.
/// It's expected that the QuorumDriver will be wrapped in an `Arc` and shared around.
/// One copy will be used in a json-RPC server to serve transaction execution requests;
/// Another copy will be held by a QuorumDriverHandler to either send signal to update the
/// committee, or to subscribe effects generated from the QuorumDriver.
pub struct QuorumDriver<A> {
validators: ArcSwap<AuthorityAggregator<A>>,
task_sender: Sender<QuorumTask<A>>,
effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>,
}

impl<A> QuorumDriver<A> {
pub fn new(
validators: AuthorityAggregator<A>,
task_sender: Sender<QuorumTask<A>>,
effects_subscribe_sender: Sender<(CertifiedTransaction, TransactionEffects)>,
) -> Self {
Self {
validators: ArcSwap::from(Arc::new(validators)),
task_sender,
effects_subscribe_sender,
}
}
}

impl<A> QuorumDriver<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn execute_transaction(
&self,
request: ExecuteTransactionRequest,
) -> SuiResult<ExecuteTransactionResponse> {
let ExecuteTransactionRequest {
transaction,
request_type,
} = request;
match request_type {
ExecuteTransactionRequestType::ImmediateReturn => {
self.task_sender
.send(QuorumTask::ProcessTransaction(transaction))
.await
.map_err(|err| SuiError::QuorumDriverCommunicationError {
error: err.to_string(),
})?;
Ok(ExecuteTransactionResponse::ImmediateReturn)
}
ExecuteTransactionRequestType::WaitForTxCert => {
let certificate = self
.process_transaction(transaction)
.instrument(tracing::debug_span!("process_tx"))
.await?;
self.task_sender
.send(QuorumTask::ProcessCertificate(certificate.clone()))
.await
.map_err(|err| SuiError::QuorumDriverCommunicationError {
error: err.to_string(),
})?;
Ok(ExecuteTransactionResponse::TxCert(Box::new(certificate)))
}
ExecuteTransactionRequestType::WaitForEffectsCert => {
let certificate = self
.process_transaction(transaction)
.instrument(tracing::debug_span!("process_tx"))
.await?;
let response = self
.process_certificate(certificate)
.instrument(tracing::debug_span!("process_cert"))
.await?;
Ok(ExecuteTransactionResponse::EffectsCert(Box::new(response)))
}
}
}

pub async fn process_transaction(
&self,
transaction: Transaction,
) -> SuiResult<CertifiedTransaction> {
self.validators
.load()
.process_transaction(transaction)
.instrument(tracing::debug_span!("process_tx"))
.await
}

pub async fn process_certificate(
&self,
certificate: CertifiedTransaction,
) -> SuiResult<(CertifiedTransaction, TransactionEffects)> {
let effects = self
.validators
.load()
.process_certificate(certificate.clone())
.instrument(tracing::debug_span!("process_cert"))
.await?;
let response = (certificate, effects);
// An error to send the result to subscribers should not block returning the result.
if let Err(err) = self.effects_subscribe_sender.send(response.clone()).await {
// TODO: We could potentially retry sending if we want.
error!("{}", err);
}
Ok(response)
}
}

impl<A> QuorumDriverHandler<A>
where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub fn new(validators: AuthorityAggregator<A>) -> Self {
let (task_tx, task_rx) = mpsc::channel::<QuorumTask<A>>(5000);
let (subscriber_tx, subscriber_rx) = mpsc::channel::<_>(5000);
let quorum_driver = Arc::new(QuorumDriver::new(validators, task_tx, subscriber_tx));
let handle = {
let quorum_driver_copy = quorum_driver.clone();
tokio::task::spawn(async move {
Self::task_queue_processor(quorum_driver_copy, task_rx).await;
})
};
Self {
quorum_driver,
_processor_handle: handle,
effects_subscriber: subscriber_rx,
}
}

pub fn clone_quorum_driver(&self) -> Arc<QuorumDriver<A>> {
self.quorum_driver.clone()
}

pub fn subscribe(&mut self) -> &mut Receiver<(CertifiedTransaction, TransactionEffects)> {
&mut self.effects_subscriber
}

pub async fn update_validators(&self, new_validators: AuthorityAggregator<A>) -> SuiResult {
self.quorum_driver
.task_sender
.send(QuorumTask::UpdateCommittee(new_validators))
.await
.map_err(|err| SuiError::QuorumDriverCommunicationError {
error: err.to_string(),
})
}

async fn task_queue_processor(
quorum_driver: Arc<QuorumDriver<A>>,
mut task_receiver: Receiver<QuorumTask<A>>,
) {
loop {
if let Some(task) = task_receiver.recv().await {
match task {
QuorumTask::ProcessTransaction(transaction) => {
// TODO: We entered here because callers do not want to wait for a
// transaction to finish execution. When this failed, we do not have a
// way to notify the caller. In the future, we may want to maintain
// some data structure for callers to come back and query the status
// of a transaction latter.
match quorum_driver.process_transaction(transaction).await {
Ok(cert) => {
if let Err(err) = quorum_driver
.task_sender
.send(QuorumTask::ProcessCertificate(cert))
.await
{
error!(
"Sending task to quorum driver queue failed: {}",
err.to_string()
);
}
}
Err(err) => {
warn!("Transaction processing failed: {:?}", err);
}
}
}
QuorumTask::ProcessCertificate(certificate) => {
// TODO: Similar to ProcessTransaction, we may want to allow callers to
// query the status.
if let Err(err) = quorum_driver.process_certificate(certificate).await {
warn!("Certificate processing failed: {:?}", err);
}
}
QuorumTask::UpdateCommittee(new_validators) => {
quorum_driver.validators.store(Arc::new(new_validators));
}
}
}
}
}
}
3 changes: 3 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ pub enum SuiError {

#[error("Use of disabled feature: {:?}", error)]
UnsupportedFeatureError { error: String },

#[error("Unable to communicate with the Quorum Driver channel: {:?}", error)]
QuorumDriverCommunicationError { error: String },
}

pub type SuiResult<T = ()> = Result<T, SuiError>;
Expand Down
22 changes: 22 additions & 0 deletions crates/sui-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::{
collections::{BTreeSet, HashSet},
hash::{Hash, Hasher},
};

#[cfg(test)]
#[path = "unit_tests/messages_tests.rs"]
mod messages_tests;
Expand Down Expand Up @@ -1353,3 +1354,24 @@ impl ConsensusTransaction {
}
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ExecuteTransactionRequestType {
ImmediateReturn,
WaitForTxCert,
WaitForEffectsCert,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ExecuteTransactionRequest {
pub transaction: Transaction,
pub request_type: ExecuteTransactionRequestType,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum ExecuteTransactionResponse {
ImmediateReturn,
TxCert(Box<CertifiedTransaction>),
// TODO: Change to CertifiedTransactionEffects eventually.
EffectsCert(Box<(CertifiedTransaction, TransactionEffects)>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit misleadingly named, isn't it? The TransactionEffects here is unsigned, so the client can't use this as a proof of finality. An effects cert would actually be a TransactionEffectsEnvelope<AuthorityQuorumSignInfo>, right?

FWIW I may look at forming effects certificates in full node, since I'm already going to be doing the work of waiting for finality before syncing a cert. I say "may" because fullnode can get away with merely proving finality to itself by observed 2f+1 idential effects digests - forming an fx cert involves further fetching 2f+1 signed effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan here is to make it return TransactionEffectsEnvelope eventually.
Currently since authority_aggregator only returns effects, here I am leaving it as it is. I plan to send future PRs to actually make authority_aggregator return effects certs.

}
1 change: 1 addition & 0 deletions crates/sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ sui-open-rpc-macros = { path = "../sui-open-rpc-macros" }
sui-json = { path = "../sui-json" }
sui-gateway = { path = "../sui-gateway" }
sui-node = { path = "../sui-node" }
sui-quorum-driver = { path = "../sui-quorum-driver" }
sui-swarm = { path = "../sui-swarm" }

rustyline = "9.1.2"
Expand Down
Loading