From f9e4b5e4e2bc13710900286ff2556727b17a9650 Mon Sep 17 00:00:00 2001 From: Alberto Sonnino Date: Mon, 2 May 2022 11:51:13 -0400 Subject: [PATCH] Fix #1680 (#1682) Fix 1680 --- sui/src/sui_commands.rs | 6 +- sui/tests/shared_objects_tests.rs | 47 +++++------- sui_core/src/authority.rs | 2 +- sui_core/src/authority_server.rs | 4 +- sui_core/src/consensus_adapter.rs | 89 +++++++++++++++------- sui_core/src/unit_tests/consensus_tests.rs | 17 ++--- 6 files changed, 98 insertions(+), 67 deletions(-) diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index 0739585025a22..6148a3c3cf641 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -553,7 +553,11 @@ pub async fn make_authority( // Spawn a consensus listener. It listen for consensus outputs and notifies the // authority server when a sequenced transaction is ready for execution. - ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui); + ConsensusListener::spawn( + rx_sui_to_consensus, + rx_consensus_to_sui, + /* max_pending_transactions */ 1_000_000, + ); // If we have network information make authority clients // to all authorities in the system. diff --git a/sui/tests/shared_objects_tests.rs b/sui/tests/shared_objects_tests.rs index bb1caffde6da8..150d58339e5f3 100644 --- a/sui/tests/shared_objects_tests.rs +++ b/sui/tests/shared_objects_tests.rs @@ -56,7 +56,7 @@ async fn submit_single_owner_transaction( async fn submit_shared_object_transaction( transaction: Transaction, configs: &[AuthorityPrivateInfo], -) -> Vec>> { +) -> Vec> { let certificate = make_certificates(vec![transaction]).pop().unwrap(); let message = ConsensusTransaction::UserTransaction(certificate); let serialized = Bytes::from(serialize_consensus_transaction(&message)); @@ -76,7 +76,7 @@ async fn submit_shared_object_transaction( } SerializedMessage::Error(error) => match *error { SuiError::ConsensusConnectionBroken(_) => { - // This is the (confusing) error message returned by the consensus + // This is the (confusing, #1489) error message returned by the consensus // adapter. It means it didn't hear back from consensus and timed out. replies.push(None); } @@ -86,7 +86,8 @@ async fn submit_shared_object_transaction( } } if replies.iter().any(|x| x.is_some()) { - break replies; + // Remove all `ConsensusConnectionBroken` replies. + break replies.into_iter().flatten().collect(); } } } @@ -128,7 +129,7 @@ async fn shared_object_transaction() { .await .pop() .unwrap(); - let info = reply.unwrap().unwrap(); + let info = reply.unwrap(); assert!(info.signed_effects.is_some()); } @@ -152,9 +153,8 @@ async fn many_shared_object_transactions() { let replies = submit_shared_object_transaction(transaction, &configs).await; for reply in replies { match reply { - Some(Ok(_)) => (), - Some(Err(error)) => panic!("{error}"), - None => (), // May happen rarely (see above comment on consensus) + Ok(_) => (), + Err(error) => panic!("{error}"), } } } @@ -209,7 +209,7 @@ async fn call_shared_object_contract() { .await .pop() .unwrap(); - let info = reply.unwrap().unwrap(); + let info = reply.unwrap(); let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); @@ -226,7 +226,7 @@ async fn call_shared_object_contract() { .await .pop() .unwrap(); - let info = reply.unwrap().unwrap(); + let info = reply.unwrap(); let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); @@ -246,7 +246,7 @@ async fn call_shared_object_contract() { .await .pop() .unwrap(); - let info = reply.unwrap().unwrap(); + let info = reply.unwrap(); let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); } @@ -300,12 +300,11 @@ async fn shared_object_flood() { let replies = submit_shared_object_transaction(transaction, &configs).await; for reply in replies { match reply { - Some(Ok(info)) => { + Ok(info) => { let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); } - Some(Err(error)) => panic!("{error}"), - None => (), // May happen rarely (see above comment on consensus) + Err(error) => panic!("{error}"), } } @@ -321,12 +320,11 @@ async fn shared_object_flood() { let replies = submit_shared_object_transaction(transaction, &configs).await; for reply in replies { match reply { - Some(Ok(info)) => { + Ok(info) => { let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); } - Some(Err(error)) => panic!("{error}"), - None => (), // May happen rarely (see above comment on consensus) + Err(error) => panic!("{error}"), } } @@ -345,12 +343,11 @@ async fn shared_object_flood() { let replies = submit_shared_object_transaction(transaction, &configs).await; for reply in replies { match reply { - Some(Ok(info)) => { + Ok(info) => { let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); } - Some(Err(error)) => panic!("{error}"), - None => (), // May happen rarely (see above comment on consensus) + Err(error) => panic!("{error}"), } } } @@ -400,7 +397,7 @@ async fn shared_object_sync() { .await .pop() .unwrap(); - let info = reply.unwrap().unwrap(); + let info = reply.unwrap(); let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); @@ -410,9 +407,8 @@ async fn shared_object_sync() { .await; for reply in replies { match reply { - Some(Err(SuiError::SharedObjectLockingFailure(_))) => (), - Some(_) => panic!("Unexpected protocol message"), - None => (), // May happen rarely (see above comment on consensus) + Err(SuiError::SharedObjectLockingFailure(_)) => (), + _ => panic!("Unexpected protocol message"), } } @@ -431,12 +427,11 @@ async fn shared_object_sync() { submit_shared_object_transaction(increment_counter_transaction, &configs[1..]).await; for reply in replies { match reply { - Some(Ok(info)) => { + Ok(info) => { let effects = info.signed_effects.unwrap().effects; assert!(matches!(effects.status, ExecutionStatus::Success { .. })); } - Some(Err(error)) => panic!("{error}"), - None => (), // May happen rarely (see above comment on consensus) + Err(error) => panic!("{error}"), } } } diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index 8e940f47ac7eb..19e6d0024da64 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -722,7 +722,7 @@ impl AuthorityState { } /// Make an information response for a transaction - pub async fn make_transaction_info( + pub(crate) async fn make_transaction_info( &self, transaction_digest: &TransactionDigest, ) -> Result { diff --git a/sui_core/src/authority_server.rs b/sui_core/src/authority_server.rs index 08dcfca7f8172..59df1cda4e436 100644 --- a/sui_core/src/authority_server.rs +++ b/sui_core/src/authority_server.rs @@ -19,7 +19,7 @@ use tokio::sync::mpsc::Sender; use std::time::Duration; use tracing::{error, info, warn, Instrument}; -use crate::consensus_adapter::{ConsensusAdapter, ConsensusInput}; +use crate::consensus_adapter::{ConsensusAdapter, ConsensusListenerMessage}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use tokio::sync::broadcast::error::RecvError; @@ -51,7 +51,7 @@ impl AuthorityServer { buffer_size: usize, state: Arc, consensus_address: SocketAddr, - tx_consensus_listener: Sender, + tx_consensus_listener: Sender, ) -> Self { let consensus_adapter = ConsensusAdapter::new( consensus_address, diff --git a/sui_core/src/consensus_adapter.rs b/sui_core/src/consensus_adapter.rs index ebe8f16368662..2267aa2eff42d 100644 --- a/sui_core/src/consensus_adapter.rs +++ b/sui_core/src/consensus_adapter.rs @@ -31,14 +31,15 @@ type ConsensusTransactionDigest = u64; /// Transaction info response serialized by Sui. type SerializedTransactionInfoResponse = Vec; -/// Channel to notify the called when the Sui certificate has been sequenced. -type Replier = oneshot::Sender>; +/// Channel to notify the caller when the Sui certificate has been sequenced. +type TxSequencedNotifier = oneshot::Sender>; -/// Message to notify the consensus adapter of a new certificate sent to consensus. +/// Message to notify the consensus listener that a new transaction has been sent to consensus +/// or that the caller timed out on a specific transaction. #[derive(Debug)] -pub struct ConsensusInput { - serialized: SerializedConsensusTransaction, - replier: Replier, +pub enum ConsensusListenerMessage { + New(SerializedConsensusTransaction, TxSequencedNotifier), + Cleanup(SerializedConsensusTransaction), } /// The message returned by the consensus to notify that a Sui certificate has been sequenced @@ -56,9 +57,11 @@ pub struct ConsensusAdapter { buffer_size: usize, /// The Sui committee information. committee: Committee, - /// A channel to notify the consensus listener of new transactions. - tx_consensus_listener: Sender, - /// The maximum duration to wait from consensus before aborting the transaction. + /// A channel to notify the consensus listener to take action for a transactions. + tx_consensus_listener: Sender, + /// The maximum duration to wait from consensus before aborting the transaction. After + /// this delay passed, the client will be notified that its transaction was probably not + /// sequence and it should try to resubmit its transaction. max_delay: Duration, } @@ -68,7 +71,7 @@ impl ConsensusAdapter { consensus_address: SocketAddr, buffer_size: usize, committee: Committee, - tx_consensus_listener: Sender, + tx_consensus_listener: Sender, max_delay: Duration, ) -> Self { Self { @@ -109,10 +112,7 @@ impl ConsensusAdapter { // Notify the consensus listener that we are expecting to process this certificate. let (sender, receiver) = oneshot::channel(); - let consensus_input = ConsensusInput { - serialized, - replier: sender, - }; + let consensus_input = ConsensusListenerMessage::New(serialized.clone(), sender); self.tx_consensus_listener .send(consensus_input) .await @@ -131,10 +131,21 @@ impl ConsensusAdapter { } // Wait for the consensus to sequence the certificate and assign locks to shared objects. - timeout(self.max_delay, receiver) - .await - .map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))? - .expect("Channel with consensus listener dropped") + // Since the consensus protocol may drop some messages, it is not guaranteed that our + // certificate will be sequenced. So the best we can do is to set a timer and notify the + // client to retry if we timeout without hearing back from consensus (this module does not + // handle retries). The best timeout value depends on the consensus protocol. + match timeout(self.max_delay, receiver).await { + Ok(reply) => reply.expect("Failed to read back from consensus listener"), + Err(e) => { + let message = ConsensusListenerMessage::Cleanup(serialized); + self.tx_consensus_listener + .send(message) + .await + .expect("Cleanup channel with consensus listener dropped"); + Err(SuiError::ConsensusConnectionBroken(e.to_string())) + } + } } } @@ -142,24 +153,32 @@ impl ConsensusAdapter { /// notify the called when they are sequenced. pub struct ConsensusListener { /// Receive messages input to the consensus. - rx_consensus_input: Receiver, + rx_consensus_input: Receiver, /// Receive consensus outputs. rx_consensus_output: Receiver, + /// The maximum number of pending replies. This cap indicates the maximum amount of client + /// transactions submitted to consensus for which we keep track. If we submit more transactions + /// than this cap, the transactions will be handled by consensus as usual but this module won't + /// be keeping track of when they are sequenced. Its only purpose is to ensure the field called + /// `pending` has a maximum size. + max_pending_transactions: usize, /// Keep a map of all consensus inputs that are currently being sequenced. - pending: HashMap>, + pending: HashMap>, } impl ConsensusListener { /// Spawn a new consensus adapter in a dedicated tokio task. pub fn spawn( - rx_consensus_input: Receiver, + rx_consensus_input: Receiver, rx_consensus_output: Receiver, + max_pending_transactions: usize, ) -> JoinHandle<()> { tokio::spawn(async move { Self { rx_consensus_input, rx_consensus_output, - pending: HashMap::new(), + max_pending_transactions, + pending: HashMap::with_capacity(2 * max_pending_transactions), } .run() .await @@ -171,12 +190,26 @@ impl ConsensusListener { async fn run(&mut self) { loop { tokio::select! { - // Keep track of this certificates so we can notify the user later. - Some(consensus_input) = self.rx_consensus_input.recv() => { - let serialized = consensus_input.serialized; - let replier = consensus_input.replier; - let digest = Self::hash(&serialized); - self.pending.entry(digest).or_insert_with(Vec::new).push(replier); + // A new transaction has been sent to consensus or is no longer needed. + Some(message) = self.rx_consensus_input.recv() => { + match message { + // Keep track of this certificates so we can notify the user later. + ConsensusListenerMessage::New(transaction, replier) => { + if self.pending.len() < self.max_pending_transactions { + let digest = Self::hash(&transaction); + self.pending.entry(digest).or_insert_with(Vec::new).push(replier); + } + }, + + // Stop waiting for a consensus transaction. + ConsensusListenerMessage::Cleanup(transaction) => { + let digest = Self::hash(&transaction); + let _ = self.pending.get_mut(&digest).and_then(|x| x.pop()); + if self.pending.get(&digest).map_or_else(|| false, |x| x.is_empty()) { + self.pending.remove(&digest); + } + } + } }, // Notify the caller that the transaction has been sequenced (if there is a caller). diff --git a/sui_core/src/unit_tests/consensus_tests.rs b/sui_core/src/unit_tests/consensus_tests.rs index f5c6aca13cb70..6b024e26fe112 100644 --- a/sui_core/src/unit_tests/consensus_tests.rs +++ b/sui_core/src/unit_tests/consensus_tests.rs @@ -118,15 +118,13 @@ async fn listen_to_sequenced_transaction() { ConsensusListener::spawn( /* rx_consensus_input */ rx_sui_to_consensus, /* rx_consensus_output */ rx_consensus_to_sui, + /* max_pending_transactions */ 100, ); // Submit a sample consensus transaction. let (sender, receiver) = oneshot::channel(); - let input = ConsensusInput { - serialized: serialized.clone(), - replier: sender, - }; - tx_sui_to_consensus.send(input).await.unwrap(); + let message = ConsensusListenerMessage::New(serialized.clone(), sender); + tx_sui_to_consensus.send(message).await.unwrap(); // Notify the consensus listener that the transaction has been sequenced. tokio::task::yield_now().await; @@ -165,10 +163,11 @@ async fn submit_transaction_to_consensus() { // Notify the submitter when a consensus transaction has been sequenced and executed. tokio::spawn(async move { - let ConsensusInput { - replier, - serialized, - } = rx_consensus_listener.recv().await.unwrap(); + let (serialized, replier) = match rx_consensus_listener.recv().await.unwrap() { + ConsensusListenerMessage::New(serialized, replier) => (serialized, replier), + message => panic!("Unexpected message {message:?}"), + }; + let message = bincode::deserialize(&serialized).expect("Failed to deserialize consensus tx"); let ConsensusTransaction::UserTransaction(certificate) = message;