Skip to content

Commit

Permalink
Offload BatchCertificateVerifier::process_queue to blocking pool (#9660)
Browse files Browse the repository at this point in the history
  • Loading branch information
andll authored Mar 22, 2023
1 parent 623ff84 commit 684a689
Showing 1 changed file with 48 additions and 20 deletions.
68 changes: 48 additions & 20 deletions crates/sui-core/src/batch_bls_verifier.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use either::Either;
use futures::pin_mut;
use itertools::izip;
use lru::LruCache;
Expand All @@ -20,6 +21,7 @@ use sui_types::{

use mysten_metrics::monitored_scope;
use tap::TapFallible;
use tokio::runtime::Handle;
use tokio::{
sync::oneshot,
time::{timeout, Duration},
Expand Down Expand Up @@ -52,10 +54,12 @@ impl CertBuffer {
}
}

fn take_and_replace(&mut self) -> Self {
let mut new = CertBuffer::new(self.capacity());
new.id = self.id + 1;
std::mem::swap(&mut new, self);
// Function consumes MutexGuard, therefore releasing the lock after mem swap is done
fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
let this = &mut *guard;
let mut new = CertBuffer::new(this.capacity());
new.id = this.id + 1;
std::mem::swap(&mut new, this);
new
}

Expand Down Expand Up @@ -161,16 +165,23 @@ impl BatchCertificateVerifier {
let (tx, rx) = oneshot::channel();
pin_mut!(rx);

let prev_id = {
let prev_id_or_buffer = {
let mut queue = self.queue.lock();
queue.push(tx, cert);
if queue.len() == queue.capacity() {
Either::Right(CertBuffer::take_and_replace(queue))
} else {
Either::Left(queue.id)
}
};
let prev_id = match prev_id_or_buffer {
Either::Left(prev_id) => prev_id,
Either::Right(buffer) => {
self.metrics.full_batches.inc();
self.process_queue(queue);
self.process_queue(buffer).await;
// unwrap ok - process_queue will have sent the result already
return rx.try_recv().unwrap();
}
queue.id
};

if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
Expand All @@ -179,42 +190,59 @@ impl BatchCertificateVerifier {
}
self.metrics.timeouts.inc();

{
let buffer = {
let queue = self.queue.lock();
// check if another thread took the queue while we were re-acquiring lock.
if prev_id == queue.id {
debug_assert_ne!(queue.len(), queue.capacity());
self.metrics.partial_batches.inc();
self.process_queue(queue);
// unwrap ok - process_queue will have sent the result already
return rx.try_recv().unwrap();
Some(CertBuffer::take_and_replace(queue))
} else {
None
}
};

if let Some(buffer) = buffer {
self.metrics.partial_batches.inc();
self.process_queue(buffer).await;
// unwrap ok - process_queue will have sent the result already
return rx.try_recv().unwrap();
}

// unwrap ok - another thread took the queue while we were re-acquiring the lock and is
// guaranteed to process the queue immediately.
rx.await.unwrap()
}

fn process_queue(&self, mut queue: MutexGuard<'_, CertBuffer>) {
let taken = queue.take_and_replace();
drop(queue);
async fn process_queue(&self, buffer: CertBuffer) {
let committee = self.committee.clone();
let metrics = self.metrics.clone();
Handle::current()
.spawn_blocking(move || Self::process_queue_sync(committee, metrics, buffer))
.await
.expect("Spawn blocking should not fail");
}

fn process_queue_sync(
committee: Arc<Committee>,
metrics: Arc<BatchCertificateVerifierMetrics>,
buffer: CertBuffer,
) {
let _scope = monitored_scope("BatchCertificateVerifier::process_queue");

let results = batch_verify_certificates(&self.committee, &taken.certs);
let results = batch_verify_certificates(&committee, &buffer.certs);
izip!(
results.into_iter(),
taken.certs.into_iter(),
taken.senders.into_iter(),
buffer.certs.into_iter(),
buffer.senders.into_iter(),
)
.for_each(|(result, cert, tx)| {
tx.send(match result {
Ok(()) => {
self.metrics.total_verified_certs.inc();
metrics.total_verified_certs.inc();
Ok(VerifiedCertificate::new_unchecked(cert))
}
Err(e) => {
self.metrics.total_failed_certs.inc();
metrics.total_failed_certs.inc();
Err(e)
}
})
Expand Down

0 comments on commit 684a689

Please sign in to comment.