Skip to content

Commit

Permalink
[core] Fix consensus adaptor mem leak + edge-case correctness (Mysten…
Browse files Browse the repository at this point in the history
…Labs#3172)

* Fix the consensus adapter cleanup mechanism
* Make many quorum drivers
* Increase the consensus timeout
* Fix interval
* Remove dead code following review

Co-authored-by: George Danezis <george@danez.is>
  • Loading branch information
gdanezis and George Danezis authored Jul 13, 2022
1 parent 1f5ab78 commit aef7652
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 44 deletions.
23 changes: 16 additions & 7 deletions crates/sui-benchmark/src/bin/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ async fn main() {
let configs = test_and_configure_authority_configs(opts.committee_size);
let _ = spawn_test_authorities(gas.clone(), &configs).await;
let clients = test_authority_aggregator(&configs);
let quorum_driver_handler = QuorumDriverHandler::new(clients);
let quorum_driver = quorum_driver_handler.clone_quorum_driver();
let quorum_driver_handler = QuorumDriverHandler::new(clients.clone());

// publish package
write("Publishing basics package".to_string());
Expand Down Expand Up @@ -140,13 +139,21 @@ async fn main() {
.collect();
let mut tasks = Vec::new();
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let request_delay_micros = 1_000_000 / (opts.num_workers * opts.target_qps);
let stat_delay_micros = 1_000_000 * opts.stat_collection_interval;

(0..opts.num_workers).for_each(|i| {
let mut free_pool = gas[i as usize].clone();
let qd = quorum_driver.clone();

// Make a per worker quorum driver, otherwise they all share the same task.
let quorum_driver_handler = QuorumDriverHandler::new(clients.clone());
let qd = quorum_driver_handler.clone_quorum_driver();

let tx_cloned = tx.clone();
let request_delay_micros = 1_000_000 / (opts.num_workers * opts.target_qps);
let stat_delay_micros = 1_000_000 * opts.stat_collection_interval;

let mut request_interval = time::interval(Duration::from_micros(request_delay_micros));
request_interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let mut stat_interval = time::interval(Duration::from_micros(stat_delay_micros));
let runner = tokio::spawn(async move {
let mut num_success = 0;
Expand Down Expand Up @@ -183,6 +190,7 @@ async fn main() {
num_submitted = 0;
min_latency = Duration::MAX;
max_latency = Duration::ZERO;
println!("Queue size: {}", futures.len());
}
_ = request_interval.tick() => {
if free_pool.is_empty() {
Expand Down Expand Up @@ -214,7 +222,7 @@ async fn main() {
NextOp::Retry(Box::new((tx, counter_id, owner)))
}
Err(sui_err) => {
error!("{}", sui_err);
error!("Retry due to error: {}", sui_err);
NextOp::Retry(Box::new((tx, counter_id, owner)))
}
}
Expand Down Expand Up @@ -266,7 +274,8 @@ async fn main() {
}
}
NextOp::Response(None) => {
num_in_flight -= 1;
// num_in_flight -= 1;
unreachable!();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl AuthorityServer {
consensus_address,
state.clone_committee(),
tx_consensus_listener,
/* max_delay */ Duration::from_millis(5_000),
/* max_delay */ Duration::from_millis(20_000),
);

Self {
Expand Down
124 changes: 93 additions & 31 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,53 @@ type SerializedTransactionInfoResponse = Vec<u8>;

/// Channel to notify the caller when the Sui certificate has been sequenced.
type TxSequencedNotifier = oneshot::Sender<SuiResult<SerializedTransactionInfoResponse>>;
type TxSequencedNotifierClose = oneshot::Sender<()>;

/// Message to notify the consensus listener that a new transaction has been sent to consensus
/// or that the caller timed out on a specific transaction.
#[derive(Debug)]
pub enum ConsensusListenerMessage {
New(SerializedConsensusTransaction, TxSequencedNotifier),
Cleanup(SerializedConsensusTransaction),
New(
SerializedConsensusTransaction,
(TxSequencedNotifier, TxSequencedNotifierClose),
),
}

pub struct ConsensusWaiter {
// This channel is used to signal the result if the transaction gets
// sequenced and observed at the output of consensus.
signal_back: oneshot::Receiver<SuiResult<SerializedTransactionInfoResponse>>,
// We use this channel as a signalling mechanism, to detect if the ConsensusWaiter
// struct is dropped, and to clean up the ConsensusListener structures to prevent
// memory leaks.
signal_close: oneshot::Receiver<()>,
}

impl ConsensusWaiter {
pub fn new() -> (
ConsensusWaiter,
(TxSequencedNotifier, TxSequencedNotifierClose),
) {
let (notif, signal_back) = oneshot::channel();
let (close, signal_close) = oneshot::channel();
(
ConsensusWaiter {
signal_back,
signal_close,
},
(notif, close),
)
}

pub fn close(&mut self) {
self.signal_close.close();
}

pub async fn wait_for_result(self) -> SuiResult<SerializedTransactionInfoResponse> {
self.signal_back
.await
.map_err(|e| SuiError::FailedToHearBackFromConsensus(e.to_string()))?
}
}

/// The message returned by the consensus to notify that a Sui certificate has been sequenced
Expand Down Expand Up @@ -116,8 +156,9 @@ impl ConsensusAdapter {
let bytes = Bytes::from(serialized.clone());

// Notify the consensus listener that we are expecting to process this certificate.
let (sender, receiver) = oneshot::channel();
let consensus_input = ConsensusListenerMessage::New(serialized.clone(), sender);
let (waiter, signals) = ConsensusWaiter::new();

let consensus_input = ConsensusListenerMessage::New(serialized.clone(), signals);
self.tx_consensus_listener
.send(consensus_input)
.await
Expand All @@ -137,14 +178,11 @@ impl ConsensusAdapter {
// certificate will be sequenced. So the best we can do is to set a timer and notify the
// client to retry if we timeout without hearing back from consensus (this module does not
// handle retries). The best timeout value depends on the consensus protocol.
match timeout(self.max_delay, receiver).await {
match timeout(self.max_delay, waiter.wait_for_result()).await {
Ok(_) => Ok(()),
Err(e) => {
let message = ConsensusListenerMessage::Cleanup(serialized);
self.tx_consensus_listener
.send(message)
.await
.expect("Cleanup channel with consensus listener dropped");
// We drop the waiter which will signal to the conensus listener task to clean up
// the channels.
Err(SuiError::FailedToHearBackFromConsensus(e.to_string()))
}
}
Expand All @@ -165,7 +203,7 @@ pub struct ConsensusListener {
/// `pending` has a maximum size.
max_pending_transactions: usize,
/// Keep a map of all consensus inputs that are currently being sequenced.
pending: HashMap<ConsensusTransactionDigest, Vec<TxSequencedNotifier>>,
pending: HashMap<ConsensusTransactionDigest, Vec<(u64, TxSequencedNotifier)>>,
}

impl ConsensusListener {
Expand Down Expand Up @@ -198,31 +236,38 @@ impl ConsensusListener {
}

/// Main loop receiving messages input to consensus and notifying the caller once the inputs
/// are sequenced (of if an error happened).
/// are sequenced (or if an error happened).
async fn run(&mut self) {
let mut closed_notifications = FuturesUnordered::new();
let mut id_counter: u64 = 0;

loop {
tokio::select! {
// A new transaction has been sent to consensus or is no longer needed.
Some(message) = self.rx_consensus_input.recv() => {
match message {
// Keep track of this certificates so we can notify the user later.
ConsensusListenerMessage::New(transaction, replier) => {
ConsensusListenerMessage::New(transaction, (replier, mut _closer)) => {
let digest = Self::hash_serialized_transaction(&transaction);
if self.pending.len() < self.max_pending_transactions {
self.pending.entry(digest).or_insert_with(Vec::new).push(replier);
let id = id_counter;
id_counter += 1;

let list = self.pending.entry(digest).or_insert_with(Vec::new);
list.push((id, replier));

// Register with the close notification.
closed_notifications.push(async move {
// Wait for the channel to close
_closer.closed().await;
// Return he digest concerned
(digest, id)
});

} else if replier.send(Err(SuiError::ListenerCapacityExceeded)).is_err() {
debug!("No replier to listen to consensus output {digest}");
}
},

// Stop waiting for a consensus transaction.
ConsensusListenerMessage::Cleanup(transaction) => {
let digest = Self::hash_serialized_transaction(&transaction);
let _ = self.pending.get_mut(&digest).and_then(|x| x.pop());
if self.pending.get(&digest).map_or_else(|| false, |x| x.is_empty()) {
self.pending.remove(&digest);
}
}
}
},

Expand All @@ -231,13 +276,29 @@ impl ConsensusListener {
let outcome = result.map_err(SuiError::from);
let digest = Self::hash_serialized_transaction(&serialized);
if let Some(repliers) = self.pending.remove(&digest) {
for replier in repliers {
for (_, replier) in repliers {
if replier.send(outcome.clone()).is_err() {
debug!("No replier to listen to consensus output {digest}");
}
}
}
}

Some((digest, id)) = closed_notifications.next() => {
let should_delete = if let Some(list) = self.pending.get_mut(&digest) {
// First clean up the list
list.retain(|(item_id, _)| *item_id != id);
// if the resuting list is empty we should delete the entry.
list.is_empty()
} else { false };

// Secondly we determine if we need to delete the entry
if should_delete {
self.pending.remove(&digest);
}

}

}
}
}
Expand Down Expand Up @@ -328,12 +389,12 @@ impl CheckpointConsensusAdapter {

/// Wait for a transaction to be sequenced by consensus (or to timeout).
async fn waiter<T>(
receiver: oneshot::Receiver<SuiResult<SerializedTransactionInfoResponse>>,
receiver: ConsensusWaiter,
retry_delay: Duration,
deliver: T,
) -> (SuiResult<SerializedTransactionInfoResponse>, T) {
let outcome = match timeout(retry_delay, receiver).await {
Ok(reply) => reply.expect("Failed to read back from consensus listener"),
let outcome = match timeout(retry_delay, receiver.wait_for_result()).await {
Ok(reply) => reply,
Err(e) => Err(SuiError::FailedToHearBackFromConsensus(e.to_string())),
};
(outcome, deliver)
Expand All @@ -351,9 +412,10 @@ impl CheckpointConsensusAdapter {
Ok(_) => {
// Notify the consensus listener that we wish to be notified once our
// consensus transaction is sequenced.
let (sender, receiver) = oneshot::channel();
let (waiter, signals) = ConsensusWaiter::new();

let consensus_input =
ConsensusListenerMessage::New(serialized.clone(), sender);
ConsensusListenerMessage::New(serialized.clone(), signals);
self.tx_consensus_listener
.send(consensus_input)
.await
Expand All @@ -362,7 +424,7 @@ impl CheckpointConsensusAdapter {
// Add the receiver to the waiter. So we can retransmit if the
// connection fails.
let deliver = (serialized, sequence_number);
let future = Self::waiter(receiver, self.retry_delay, deliver);
let future = Self::waiter(waiter, self.retry_delay, deliver);
waiting.push(future);
}
Err(_) => {
Expand Down Expand Up @@ -394,7 +456,7 @@ impl CheckpointConsensusAdapter {
self.buffer.push_front((serialized, sequence_number));
},

// Listen to checkpoint fragments who failed to be sequenced and need reties.
// Listen to checkpoint fragments who failed to be sequenced and need retries.
Some((outcome, identifier)) = waiting.next() => {
if let Err(error) = outcome {
tracing::debug!("Failed to sequence transaction: {error}");
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-core/src/unit_tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ async fn listen_to_sequenced_transaction() {
);

// Submit a sample consensus transaction.
let (sender, receiver) = oneshot::channel();
let message = ConsensusListenerMessage::New(serialized.clone(), sender);
let (waiter, signals) = ConsensusWaiter::new();

let message = ConsensusListenerMessage::New(serialized.clone(), signals);
tx_sui_to_consensus.send(message).await.unwrap();

// Notify the consensus listener that the transaction has been sequenced.
Expand All @@ -129,7 +130,7 @@ async fn listen_to_sequenced_transaction() {
tx_consensus_to_sui.send(output).await.unwrap();

// Ensure the caller get notified from the consensus listener.
assert!(receiver.await.unwrap().is_ok());
assert!(waiter.wait_for_result().await.is_ok());
}

#[tokio::test]
Expand Down Expand Up @@ -165,7 +166,6 @@ async fn submit_transaction_to_consensus() {
while let Some(message) = rx_consensus_listener.recv().await {
let (serialized, replier) = match message {
ConsensusListenerMessage::New(serialized, replier) => (serialized, replier),
message => panic!("Unexpected message {message:?}"),
};

let message =
Expand All @@ -186,7 +186,7 @@ async fn submit_transaction_to_consensus() {

// Reply to the submitter.
let result = Ok(Vec::default());
replier.send(result).unwrap();
replier.0.send(result).unwrap();
}
});

Expand Down

0 comments on commit aef7652

Please sign in to comment.