Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1680 #1682

Merged
merged 22 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sui/src/sui_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ pub async fn make_authority(

// Spawn a consensus listener. It listen for consensus outputs and notifies the
// authority server when a sequenced transaction is ready for execution.
ConsensusListener::spawn(rx_sui_to_consensus, rx_consensus_to_sui);
ConsensusListener::spawn(
rx_sui_to_consensus,
rx_consensus_to_sui,
/* max_pending_transactions */ 1_000_000,
);

// If we have network information make authority clients
// to all authorities in the system.
Expand Down
47 changes: 21 additions & 26 deletions sui/tests/shared_objects_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn submit_single_owner_transaction(
async fn submit_shared_object_transaction(
transaction: Transaction,
configs: &[AuthorityPrivateInfo],
) -> Vec<Option<SuiResult<TransactionInfoResponse>>> {
) -> Vec<SuiResult<TransactionInfoResponse>> {
let certificate = make_certificates(vec![transaction]).pop().unwrap();
let message = ConsensusTransaction::UserTransaction(certificate);
let serialized = Bytes::from(serialize_consensus_transaction(&message));
Expand All @@ -76,7 +76,7 @@ async fn submit_shared_object_transaction(
}
SerializedMessage::Error(error) => match *error {
SuiError::ConsensusConnectionBroken(_) => {
// This is the (confusing) error message returned by the consensus
// This is the (confusing, #1489) error message returned by the consensus
// adapter. It means it didn't hear back from consensus and timed out.
replies.push(None);
}
Expand All @@ -86,7 +86,8 @@ async fn submit_shared_object_transaction(
}
}
if replies.iter().any(|x| x.is_some()) {
break replies;
// Remove all `ConsensusConnectionBroken` replies.
break replies.into_iter().flatten().collect();
}
}
}
Expand Down Expand Up @@ -128,7 +129,7 @@ async fn shared_object_transaction() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
assert!(info.signed_effects.is_some());
}

Expand All @@ -152,9 +153,8 @@ async fn many_shared_object_transactions() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(_)) => (),
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Ok(_) => (),
Err(error) => panic!("{error}"),
}
}
}
Expand Down Expand Up @@ -209,7 +209,7 @@ async fn call_shared_object_contract() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

Expand All @@ -226,7 +226,7 @@ async fn call_shared_object_contract() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

Expand All @@ -246,7 +246,7 @@ async fn call_shared_object_contract() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Expand Down Expand Up @@ -300,12 +300,11 @@ async fn shared_object_flood() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}

Expand All @@ -321,12 +320,11 @@ async fn shared_object_flood() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}

Expand All @@ -345,12 +343,11 @@ async fn shared_object_flood() {
let replies = submit_shared_object_transaction(transaction, &configs).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}
}
Expand Down Expand Up @@ -400,7 +397,7 @@ async fn shared_object_sync() {
.await
.pop()
.unwrap();
let info = reply.unwrap().unwrap();
let info = reply.unwrap();
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

Expand All @@ -410,9 +407,8 @@ async fn shared_object_sync() {
.await;
for reply in replies {
match reply {
Some(Err(SuiError::SharedObjectLockingFailure(_))) => (),
Some(_) => panic!("Unexpected protocol message"),
None => (), // May happen rarely (see above comment on consensus)
Err(SuiError::SharedObjectLockingFailure(_)) => (),
_ => panic!("Unexpected protocol message"),
}
}

Expand All @@ -431,12 +427,11 @@ async fn shared_object_sync() {
submit_shared_object_transaction(increment_counter_transaction, &configs[1..]).await;
for reply in replies {
match reply {
Some(Ok(info)) => {
Ok(info) => {
let effects = info.signed_effects.unwrap().effects;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));
}
Some(Err(error)) => panic!("{error}"),
None => (), // May happen rarely (see above comment on consensus)
Err(error) => panic!("{error}"),
}
}
}
2 changes: 1 addition & 1 deletion sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl AuthorityState {
}

/// Make an information response for a transaction
pub async fn make_transaction_info(
pub(crate) async fn make_transaction_info(
&self,
transaction_digest: &TransactionDigest,
) -> Result<TransactionInfoResponse, SuiError> {
Expand Down
4 changes: 2 additions & 2 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::mpsc::Sender;
use std::time::Duration;
use tracing::{error, info, warn, Instrument};

use crate::consensus_adapter::{ConsensusAdapter, ConsensusInput};
use crate::consensus_adapter::{ConsensusAdapter, ConsensusListenerMessage};
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use tokio::sync::broadcast::error::RecvError;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl AuthorityServer {
buffer_size: usize,
state: Arc<AuthorityState>,
consensus_address: SocketAddr,
tx_consensus_listener: Sender<ConsensusInput>,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
) -> Self {
let consensus_adapter = ConsensusAdapter::new(
consensus_address,
Expand Down
89 changes: 61 additions & 28 deletions sui_core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type ConsensusTransactionDigest = u64;
/// Transaction info response serialized by Sui.
type SerializedTransactionInfoResponse = Vec<u8>;

/// Channel to notify the called when the Sui certificate has been sequenced.
type Replier = oneshot::Sender<SuiResult<SerializedTransactionInfoResponse>>;
/// Channel to notify the caller when the Sui certificate has been sequenced.
type TxSequencedNotifier = oneshot::Sender<SuiResult<SerializedTransactionInfoResponse>>;

/// Message to notify the consensus adapter of a new certificate sent to consensus.
/// Message to notify the consensus listener that a new transaction has been sent to consensus
/// or that the caller timed out on a specific transaction.
#[derive(Debug)]
pub struct ConsensusInput {
serialized: SerializedConsensusTransaction,
replier: Replier,
pub enum ConsensusListenerMessage {
New(SerializedConsensusTransaction, TxSequencedNotifier),
Cleanup(SerializedConsensusTransaction),
}

/// The message returned by the consensus to notify that a Sui certificate has been sequenced
Expand All @@ -56,9 +57,11 @@ pub struct ConsensusAdapter {
buffer_size: usize,
/// The Sui committee information.
committee: Committee,
/// A channel to notify the consensus listener of new transactions.
tx_consensus_listener: Sender<ConsensusInput>,
/// The maximum duration to wait from consensus before aborting the transaction.
/// A channel to notify the consensus listener to take action for a transactions.
tx_consensus_listener: Sender<ConsensusListenerMessage>,
/// The maximum duration to wait from consensus before aborting the transaction. After
/// this delay passed, the client will be notified that its transaction was probably not
/// sequence and it should try to resubmit its transaction.
max_delay: Duration,
}

Expand All @@ -68,7 +71,7 @@ impl ConsensusAdapter {
consensus_address: SocketAddr,
buffer_size: usize,
committee: Committee,
tx_consensus_listener: Sender<ConsensusInput>,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
max_delay: Duration,
) -> Self {
Self {
Expand Down Expand Up @@ -109,10 +112,7 @@ impl ConsensusAdapter {

// Notify the consensus listener that we are expecting to process this certificate.
let (sender, receiver) = oneshot::channel();
let consensus_input = ConsensusInput {
serialized,
replier: sender,
};
let consensus_input = ConsensusListenerMessage::New(serialized.clone(), sender);
self.tx_consensus_listener
.send(consensus_input)
.await
Expand All @@ -131,35 +131,54 @@ impl ConsensusAdapter {
}

// Wait for the consensus to sequence the certificate and assign locks to shared objects.
timeout(self.max_delay, receiver)
.await
.map_err(|e| SuiError::ConsensusConnectionBroken(e.to_string()))?
.expect("Channel with consensus listener dropped")
// Since the consensus protocol may drop some messages, it is not guaranteed that our
// certificate will be sequenced. So the best we can do is to set a timer and notify the
// client to retry if we timeout without hearing back from consensus (this module does not
// handle retries). The best timeout value depends on the consensus protocol.
match timeout(self.max_delay, receiver).await {
Ok(reply) => reply.expect("Failed to read back from consensus listener"),
Err(e) => {
let message = ConsensusListenerMessage::Cleanup(serialized);
self.tx_consensus_listener
.send(message)
.await
.expect("Cleanup channel with consensus listener dropped");
Err(SuiError::ConsensusConnectionBroken(e.to_string()))
}
}
}
}

/// This module interfaces the consensus with Sui. It receives certificates input to consensus and
/// notify the called when they are sequenced.
pub struct ConsensusListener {
/// Receive messages input to the consensus.
rx_consensus_input: Receiver<ConsensusInput>,
rx_consensus_input: Receiver<ConsensusListenerMessage>,
/// Receive consensus outputs.
rx_consensus_output: Receiver<ConsensusOutput>,
/// The maximum number of pending replies. This cap indicates the maximum amount of client
/// transactions submitted to consensus for which we keep track. If we submit more transactions
/// than this cap, the transactions will be handled by consensus as usual but this module won't
/// be keeping track of when they are sequenced. Its only purpose is to ensure the field called
/// `pending` has a maximum size.
max_pending_transactions: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if this cap is exceeded there can be clients waiting for an ack forever? Shouldn't we be rejecting instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel drops so the client will knows automatically. Although you're right that this can be more explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me make a note for this

/// Keep a map of all consensus inputs that are currently being sequenced.
pending: HashMap<ConsensusTransactionDigest, Vec<Replier>>,
pending: HashMap<ConsensusTransactionDigest, Vec<TxSequencedNotifier>>,
}

impl ConsensusListener {
/// Spawn a new consensus adapter in a dedicated tokio task.
pub fn spawn(
rx_consensus_input: Receiver<ConsensusInput>,
rx_consensus_input: Receiver<ConsensusListenerMessage>,
rx_consensus_output: Receiver<ConsensusOutput>,
max_pending_transactions: usize,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
rx_consensus_input,
rx_consensus_output,
pending: HashMap::new(),
max_pending_transactions,
pending: HashMap::with_capacity(2 * max_pending_transactions),
}
.run()
.await
Expand All @@ -171,12 +190,26 @@ impl ConsensusListener {
async fn run(&mut self) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key question: why is all this happening through message passing and separate tasks instead of the task that interacts with consensus keeping the hashmap and updating when transactions go in, and when transactions go out? Are there more than 1 threads injecting and consuming messages to/from consensus?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me how to best embed this with the current Sui logic. There is one task (ConsensusListener) that receives consensus outputs. The ConsensusAdapter is not a task, it simply provides helpers to interact with consensus that the AuthorityServer uses.

We have many tasks injecting transactions to consensus (the same tasks running the AuthorityServer, one per client). We however have a single task handling the output of consensus (ConsensusListener).

loop {
tokio::select! {
// Keep track of this certificates so we can notify the user later.
Some(consensus_input) = self.rx_consensus_input.recv() => {
let serialized = consensus_input.serialized;
let replier = consensus_input.replier;
let digest = Self::hash(&serialized);
self.pending.entry(digest).or_insert_with(Vec::new).push(replier);
// A new transaction has been sent to consensus or is no longer needed.
Some(message) = self.rx_consensus_input.recv() => {
match message {
// Keep track of this certificates so we can notify the user later.
ConsensusListenerMessage::New(transaction, replier) => {
if self.pending.len() < self.max_pending_transactions {
let digest = Self::hash(&transaction);
self.pending.entry(digest).or_insert_with(Vec::new).push(replier);
}
},

// Stop waiting for a consensus transaction.
ConsensusListenerMessage::Cleanup(transaction) => {
let digest = Self::hash(&transaction);
let _ = self.pending.get_mut(&digest).and_then(|x| x.pop());
if self.pending.get(&digest).map_or_else(|| false, |x| x.is_empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't wait for inspect to stabilize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary, or could we just periodically self.pending.retain(|x| !x.is_empty())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but it's extra LOC

self.pending.remove(&digest);
}
}
}
},

// Notify the caller that the transaction has been sequenced (if there is a caller).
Expand Down
17 changes: 8 additions & 9 deletions sui_core/src/unit_tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,13 @@ async fn listen_to_sequenced_transaction() {
ConsensusListener::spawn(
/* rx_consensus_input */ rx_sui_to_consensus,
/* rx_consensus_output */ rx_consensus_to_sui,
/* max_pending_transactions */ 100,
);

// Submit a sample consensus transaction.
let (sender, receiver) = oneshot::channel();
let input = ConsensusInput {
serialized: serialized.clone(),
replier: sender,
};
tx_sui_to_consensus.send(input).await.unwrap();
let message = ConsensusListenerMessage::New(serialized.clone(), sender);
tx_sui_to_consensus.send(message).await.unwrap();

// Notify the consensus listener that the transaction has been sequenced.
tokio::task::yield_now().await;
Expand Down Expand Up @@ -165,10 +163,11 @@ async fn submit_transaction_to_consensus() {

// Notify the submitter when a consensus transaction has been sequenced and executed.
tokio::spawn(async move {
let ConsensusInput {
replier,
serialized,
} = rx_consensus_listener.recv().await.unwrap();
let (serialized, replier) = match rx_consensus_listener.recv().await.unwrap() {
ConsensusListenerMessage::New(serialized, replier) => (serialized, replier),
message => panic!("Unexpected message {message:?}"),
};

let message =
bincode::deserialize(&serialized).expect("Failed to deserialize consensus tx");
let ConsensusTransaction::UserTransaction(certificate) = message;
Expand Down