Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix 1680
  • Loading branch information
asonnino authored May 2, 2022
1 parent 4a2e116 commit f9e4b5e
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 67 deletions.
6 changes: 5 additions & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ pub async fn make_authority(

// Spawn a consensus listener. It listen for consensus outputs and notifies the
// authority server when a sequenced transaction is ready for execution.
ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui);
ConsensusListener::spawn(
rx_sui_to_consensus,
rx_consensus_to_sui,
/* max_pending_transactions */ 1_000_000,
);

// If we have network information make authority clients
// to all authorities in the system.
Expand Down
47 changes: 21 additions & 26 deletions sui/tests/shared_objects_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn submit_single_owner_transaction(
async fn submit_shared_object_transaction(
transaction: Transaction,
configs: &[AuthorityPrivateInfo],
) -> Vec<Option<SuiResult<TransactionInfoResponse>>> {
) -> Vec<SuiResult<TransactionInfoResponse>> {
let certificate = make_certificates(vec![transaction]).pop().unwrap();
let message = ConsensusTransaction::UserTransaction(certificate);
let serialized = Bytes::from(serialize_consensus_transaction(&message));
Expand All @@ -76,7 +76,7 @@ async fn submit_shared_object_transaction(
}
SerializedMessage::Error(error) => match *error {
SuiError::ConsensusConnectionBroken(_) => {
// This is the (confusing) error message returned by the consensus
// This is the (confusing, #1489) error message returned by the consensus
// adapter. It means it didn't hear back from consensus and timed out.
replies.push(None);
}
Expand All @@ -86,7 +86,8 @@ async fn submit_shared_object_transaction(
}
}
if replies.iter().any(|x| x.is_some()) {
break replies;
// Remove all `ConsensusConnectionBroken` replies.
break replies.into_iter().flatten().collect();
}
}
}
Expand Down Expand Up @@ -128,7 +129,7 @@ async fn shared_object_transaction() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
assert!(info.signed_effects.is_some());
}

Expand All @@ -152,9 +153,8 @@ async fn many_shared_object_transactions() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(_)) => (),
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Ok(_) => (),
Err(error) => panic!("{error}"),
}
}
}
Expand Down Expand Up @@ -209,7 +209,7 @@ async fn call_shared_object_contract() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

Expand All @@ -226,7 +226,7 @@ async fn call_shared_object_contract() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

Expand All @@ -246,7 +246,7 @@ async fn call_shared_object_contract() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Expand Down Expand Up @@ -300,12 +300,11 @@ async fn shared_object_flood() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}

Expand All @@ -321,12 +320,11 @@ async fn shared_object_flood() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}

Expand All @@ -345,12 +343,11 @@ async fn shared_object_flood() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}
}
Expand Down Expand Up @@ -400,7 +397,7 @@ async fn shared_object_sync() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

Expand All @@ -410,9 +407,8 @@ async fn shared_object_sync() {
.await;
for reply in replies {
match reply {
Some(Err(SuiError::SharedObjectLockingFailure(_))) => (),
Some(_) => panic!("Unexpected protocol message"),
None => (), // May happen rarely (see above comment on consensus)
Err(SuiError::SharedObjectLockingFailure(_)) => (),
_ => panic!("Unexpected protocol message"),
}
}

Expand All @@ -431,12 +427,11 @@ async fn shared_object_sync() {
submit_shared_object_transaction(increment_counter_transaction, &configs[1..]).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}
}
2 changes: 1 addition & 1 deletion sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl AuthorityState {
}

/// Make an information response for a transaction
pub async fn make_transaction_info(
pub(crate) async fn make_transaction_info(
&self,
transaction_digest: &TransactionDigest,
) -> Result<TransactionInfoResponse, SuiError> {
Expand Down
4 changes: 2 additions & 2 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::mpsc::Sender;
use std::time::Duration;
use tracing::{error, info, warn, Instrument};

use crate::consensus_adapter::{ConsensusAdapter, ConsensusInput};
use crate::consensus_adapter::{ConsensusAdapter, ConsensusListenerMessage};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use tokio::sync::broadcast::error::RecvError;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl AuthorityServer {
buffer_size: usize,
state: Arc<AuthorityState>,
consensus_address: SocketAddr,
tx_consensus_listener: Sender<ConsensusInput>,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
) -> Self {
let consensus_adapter = ConsensusAdapter::new(
consensus_address,
Expand Down
89 changes: 61 additions & 28 deletions sui_core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type ConsensusTransactionDigest = u64;
/// Transaction info response serialized by Sui.
type SerializedTransactionInfoResponse = Vec<u8>;

/// Channel to notify the called when the Sui certificate has been sequenced.
type Replier = oneshot::Sender<SuiResult<SerializedTransactionInfoResponse>>;
/// Channel to notify the caller when the Sui certificate has been sequenced.
type TxSequencedNotifier = oneshot::Sender<SuiResult<SerializedTransactionInfoResponse>>;

/// Message to notify the consensus adapter of a new certificate sent to consensus.
/// Message to notify the consensus listener that a new transaction has been sent to consensus
/// or that the caller timed out on a specific transaction.
#[derive(Debug)]
pub struct ConsensusInput {
serialized: SerializedConsensusTransaction,
replier: Replier,
pub enum ConsensusListenerMessage {
New(SerializedConsensusTransaction, TxSequencedNotifier),
Cleanup(SerializedConsensusTransaction),
}

/// The message returned by the consensus to notify that a Sui certificate has been sequenced
Expand All @@ -56,9 +57,11 @@ pub struct ConsensusAdapter {
buffer_size: usize,
/// The Sui committee information.
committee: Committee,
/// A channel to notify the consensus listener of new transactions.
tx_consensus_listener: Sender<ConsensusInput>,
/// The maximum duration to wait from consensus before aborting the transaction.
/// A channel to notify the consensus listener to take action for a transactions.
tx_consensus_listener: Sender<ConsensusListenerMessage>,
/// The maximum duration to wait from consensus before aborting the transaction. After
/// this delay passed, the client will be notified that its transaction was probably not
/// sequence and it should try to resubmit its transaction.
max_delay: Duration,
}

Expand All @@ -68,7 +71,7 @@ impl ConsensusAdapter {
consensus_address: SocketAddr,
buffer_size: usize,
committee: Committee,
tx_consensus_listener: Sender<ConsensusInput>,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
max_delay: Duration,
) -> Self {
Self {
Expand Down Expand Up @@ -109,10 +112,7 @@ impl ConsensusAdapter {

// Notify the consensus listener that we are expecting to process this certificate.
let (sender, receiver) = oneshot::channel();
let consensus_input = ConsensusInput {
serialized,
replier: sender,
};
let consensus_input = ConsensusListenerMessage::New(serialized.clone(), sender);
self.tx_consensus_listener
.send(consensus_input)
.await
Expand All @@ -131,35 +131,54 @@ impl ConsensusAdapter {
}

// Wait for the consensus to sequence the certificate and assign locks to shared objects.
timeout(self.max_delay, receiver)
.await
.map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?
.expect("Channel with consensus listener dropped")
// Since the consensus protocol may drop some messages, it is not guaranteed that our
// certificate will be sequenced. So the best we can do is to set a timer and notify the
// client to retry if we timeout without hearing back from consensus (this module does not
// handle retries). The best timeout value depends on the consensus protocol.
match timeout(self.max_delay, receiver).await {
Ok(reply) => reply.expect("Failed to read back from consensus listener"),
Err(e) => {
let message = ConsensusListenerMessage::Cleanup(serialized);
self.tx_consensus_listener
.send(message)
.await
.expect("Cleanup channel with consensus listener dropped");
Err(SuiError::ConsensusConnectionBroken(e.to_string()))
}
}
}
}

/// This module interfaces the consensus with Sui. It receives certificates input to consensus and
/// notify the called when they are sequenced.
pub struct ConsensusListener {
/// Receive messages input to the consensus.
rx_consensus_input: Receiver<ConsensusInput>,
rx_consensus_input: Receiver<ConsensusListenerMessage>,
/// Receive consensus outputs.
rx_consensus_output: Receiver<ConsensusOutput>,
/// The maximum number of pending replies. This cap indicates the maximum amount of client
/// transactions submitted to consensus for which we keep track. If we submit more transactions
/// than this cap, the transactions will be handled by consensus as usual but this module won't
/// be keeping track of when they are sequenced. Its only purpose is to ensure the field called
/// `pending` has a maximum size.
max_pending_transactions: usize,
/// Keep a map of all consensus inputs that are currently being sequenced.
pending: HashMap<ConsensusTransactionDigest, Vec<Replier>>,
pending: HashMap<ConsensusTransactionDigest, Vec<TxSequencedNotifier>>,
}

impl ConsensusListener {
/// Spawn a new consensus adapter in a dedicated tokio task.
pub fn spawn(
rx_consensus_input: Receiver<ConsensusInput>,
rx_consensus_input: Receiver<ConsensusListenerMessage>,
rx_consensus_output: Receiver<ConsensusOutput>,
max_pending_transactions: usize,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
rx_consensus_input,
rx_consensus_output,
pending: HashMap::new(),
max_pending_transactions,
pending: HashMap::with_capacity(2 * max_pending_transactions),
}
.run()
.await
Expand All @@ -171,12 +190,26 @@ impl ConsensusListener {
async fn run(&mut self) {
loop {
tokio::select! {
// Keep track of this certificates so we can notify the user later.
Some(consensus_input) = self.rx_consensus_input.recv() => {
let serialized = consensus_input.serialized;
let replier = consensus_input.replier;
let digest = Self::hash(&serialized);
self.pending.entry(digest).or_insert_with(Vec::new).push(replier);
// A new transaction has been sent to consensus or is no longer needed.
Some(message) = self.rx_consensus_input.recv() => {
match message {
// Keep track of this certificates so we can notify the user later.
ConsensusListenerMessage::New(transaction, replier) => {
if self.pending.len() < self.max_pending_transactions {
let digest = Self::hash(&transaction);
self.pending.entry(digest).or_insert_with(Vec::new).push(replier);
}
},

// Stop waiting for a consensus transaction.
ConsensusListenerMessage::Cleanup(transaction) => {
let digest = Self::hash(&transaction);
let _ = self.pending.get_mut(&digest).and_then(|x| x.pop());
if self.pending.get(&digest).map_or_else(|| false, |x| x.is_empty()) {
self.pending.remove(&digest);
}
}
}
},

// Notify the caller that the transaction has been sequenced (if there is a caller).
Expand Down
17 changes: 8 additions & 9 deletions sui_core/src/unit_tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,13 @@ async fn listen_to_sequenced_transaction() {
ConsensusListener::spawn(
/* rx_consensus_input */ rx_sui_to_consensus,
/* rx_consensus_output */ rx_consensus_to_sui,
/* max_pending_transactions */ 100,
);

// Submit a sample consensus transaction.
let (sender, receiver) = oneshot::channel();
let input = ConsensusInput {
serialized: serialized.clone(),
replier: sender,
};
tx_sui_to_consensus.send(input).await.unwrap();
let message = ConsensusListenerMessage::New(serialized.clone(), sender);
tx_sui_to_consensus.send(message).await.unwrap();

// Notify the consensus listener that the transaction has been sequenced.
tokio::task::yield_now().await;
Expand Down Expand Up @@ -165,10 +163,11 @@ async fn submit_transaction_to_consensus() {

// Notify the submitter when a consensus transaction has been sequenced and executed.
tokio::spawn(async move {
let ConsensusInput {
replier,
serialized,
} = rx_consensus_listener.recv().await.unwrap();
let (serialized, replier) = match rx_consensus_listener.recv().await.unwrap() {
ConsensusListenerMessage::New(serialized, replier) => (serialized, replier),
message => panic!("Unexpected message {message:?}"),
};

let message =
bincode::deserialize(&serialized).expect("Failed to deserialize consensus tx");
let ConsensusTransaction::UserTransaction(certificate) = message;
Expand Down

0 comments on commit f9e4b5e

Please sign in to comment.