Skip to content

Commit

Permalink
Offload batch verification to blocking pool (MystenLabs#9653)
Browse files Browse the repository at this point in the history
  • Loading branch information
andll authored Mar 21, 2023
1 parent bfd256e commit 295b4f7
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 14 deletions.
2 changes: 2 additions & 0 deletions crates/sui-core/src/batch_bls_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sui_types::{
messages_checkpoint::SignedCheckpointSummary,
};

use mysten_metrics::monitored_scope;
use tap::TapFallible;
use tokio::{
sync::oneshot,
Expand Down Expand Up @@ -198,6 +199,7 @@ impl BatchCertificateVerifier {
fn process_queue(&self, mut queue: MutexGuard<'_, CertBuffer>) {
let taken = queue.take_and_replace();
drop(queue);
let _scope = monitored_scope("BatchCertificateVerifier::process_queue");

let results = batch_verify_certificates(&self.committee, &taken.certs);
izip!(
Expand Down
25 changes: 16 additions & 9 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use std::sync::Arc;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::transaction_manager::TransactionManager;
use async_trait::async_trait;
use narwhal_worker::TransactionValidator;
use sui_types::messages::{ConsensusTransaction, ConsensusTransactionKind};
use tap::TapFallible;

use tokio::runtime::Handle;
use tracing::{info, warn};

/// Allows verifying the validity of transactions
Expand Down Expand Up @@ -45,6 +46,7 @@ fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
.wrap_err("Malformed transaction (failed to deserialize)")
}

#[async_trait]
impl TransactionValidator for SuiTxValidator {
type Error = eyre::Report;

Expand All @@ -53,7 +55,7 @@ impl TransactionValidator for SuiTxValidator {
Ok(())
}

fn validate_batch(&self, b: &narwhal_types::Batch) -> Result<(), Self::Error> {
async fn validate_batch(&self, b: &narwhal_types::Batch) -> Result<(), Self::Error> {
let _scope = monitored_scope("ValidateBatch");
let txs = b
.transactions
Expand Down Expand Up @@ -85,11 +87,16 @@ impl TransactionValidator for SuiTxValidator {
// verify the certificate signatures as a batch
let cert_count = cert_batch.len();
let ckpt_count = ckpt_batch.len();
self.epoch_store
.batch_verifier
.verify_certs_and_checkpoints(cert_batch, ckpt_batch)
.tap_err(|e| warn!("batch verification error: {}", e))
.wrap_err("Malformed batch (failed to verify)")?;
let epoch_store = self.epoch_store.clone();
Handle::current()
.spawn_blocking(move || {
epoch_store
.batch_verifier
.verify_certs_and_checkpoints(cert_batch, ckpt_batch)
.tap_err(|e| warn!("batch verification error: {}", e))
.wrap_err("Malformed batch (failed to verify)")
})
.await??;
self.metrics
.certificate_signatures_verified
.inc_by(cert_count as u64);
Expand Down Expand Up @@ -195,7 +202,7 @@ mod tests {
.collect();

let batch = Batch::new(transaction_bytes);
let res_batch = validator.validate_batch(&batch);
let res_batch = validator.validate_batch(&batch).await;
assert!(res_batch.is_ok(), "{res_batch:?}");

let bogus_transaction_bytes: Vec<_> = certificates
Expand All @@ -211,7 +218,7 @@ mod tests {
.collect();

let batch = Batch::new(bogus_transaction_bytes);
let res_batch = validator.validate_batch(&batch);
let res_batch = validator.validate_batch(&batch).await;
assert!(res_batch.is_err());
}
}
4 changes: 2 additions & 2 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<V: TransactionValidator> WorkerToWorker for WorkerReceiverHandler<V> {
request: anemo::Request<WorkerBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();
if let Err(err) = self.validator.validate_batch(&message.batch) {
if let Err(err) = self.validator.validate_batch(&message.batch).await {
// The batch is invalid, we don't want to process it.
return Err(anemo::rpc::Status::new_with_message(
StatusCode::BadRequest,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<V: TransactionValidator> PrimaryToWorker for PrimaryReceiverHandler<V> {
if let Some(batch) = response.into_body().batch {
if !message.is_certified {
// This batch is not part of a certificate, so we need to validate it.
if let Err(err) = self.validator.validate_batch(&batch) {
if let Err(err) = self.validator.validate_batch(&batch).await {
// The batch is invalid, we don't want to process it.
return Err(anemo::rpc::Status::new_with_message(
StatusCode::BadRequest,
Expand Down
4 changes: 3 additions & 1 deletion narwhal/worker/src/tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::{metrics::initialise_metrics, TrivialTransactionValidator};
use async_trait::async_trait;
use bytes::Bytes;
use consensus::consensus::ConsensusRound;
use consensus::{dag::Dag, metrics::ConsensusMetrics};
Expand All @@ -29,13 +30,14 @@ use types::{
// A test validator that rejects every transaction / batch
#[derive(Clone)]
struct NilTxValidator;
#[async_trait]
impl TransactionValidator for NilTxValidator {
type Error = eyre::Report;

fn validate(&self, _tx: &[u8]) -> Result<(), Self::Error> {
eyre::bail!("Invalid transaction");
}
fn validate_batch(&self, _txs: &Batch) -> Result<(), Self::Error> {
async fn validate_batch(&self, _txs: &Batch) -> Result<(), Self::Error> {
eyre::bail!("Invalid batch");
}
}
Expand Down
7 changes: 5 additions & 2 deletions narwhal/worker/src/tx_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,33 @@ use std::fmt::{Debug, Display};

// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use types::Batch;

/// Defines the validation procedure for receiving either a new single transaction (from a client)
/// of a batch of transactions (from another validator). Invalid transactions will not receive
/// further processing.
#[async_trait]
pub trait TransactionValidator: Clone + Send + Sync + 'static {
type Error: Display + Debug + Send + Sync + 'static;
/// Determines if a transaction valid for the worker to consider putting in a batch
fn validate(&self, t: &[u8]) -> Result<(), Self::Error>;
/// Determines if this batch can be voted on
fn validate_batch(&self, b: &Batch) -> Result<(), Self::Error>;
async fn validate_batch(&self, b: &Batch) -> Result<(), Self::Error>;
}

/// Simple validator that accepts all transactions and batches.
#[derive(Debug, Clone, Default)]
pub struct TrivialTransactionValidator;
#[async_trait]
impl TransactionValidator for TrivialTransactionValidator {
type Error = eyre::Report;

fn validate(&self, _t: &[u8]) -> Result<(), Self::Error> {
Ok(())
}

fn validate_batch(&self, _b: &Batch) -> Result<(), Self::Error> {
async fn validate_batch(&self, _b: &Batch) -> Result<(), Self::Error> {
Ok(())
}
}

0 comments on commit 295b4f7

Please sign in to comment.