Skip to content

Commit

Permalink
Refactor ConsensusHandler in preparation for parallel signature verif…
Browse files Browse the repository at this point in the history
…ication (MystenLabs#5344)

This PR refactors ConsensusHandler in order to prepare for parallel verification of the transaction signatures.

On Sui side:
(1) Break down consensus handling into two independent functions on the `AuthorityState` - one for (potentially parallel) verification and one for sequential handling of the transaction.

(2) Move `ConsensusHandler` into separate file - `authority.rs` is growing big and parallel signature verification will add even more code to ConsensusHandler.

(3) Abolish `NarwhalHandlerError` - after separating `handle_consensus_transaction` and `verify_consensus_transaction`, `ConsensusHandler` can now decide what to do based on what function has failed. Note that in practice nothing has changed in how errors are processed by this PR - previously we were mapping errors from `verify_narwhal_transaction` into `NarwhalHandlerError::Skip`, and now we just call it directly from `ConsensusHandler`.

(4) Two utilization metrics on the `ConsensusHandler` side, namely `verify_narwhal_transaction_duration_mcs` and `handle_consensus_duration_mcs` slightly changed what they measure - previously verification metric was fraction of handle_consensus timer, now they are two independent steps. The total consensus utilization can now be measured as sum of those two metrics (until parallel verification is introduced).

On Narwhal side:
(1) Minor interface change - pass `Arc<ConsensusOutput>` instead of `&ConsensusOutput` from nw executor - this will allow to caller to cheaply clone ConsensusOutput when it needs to pass it to verification thread
  • Loading branch information
andll authored Oct 18, 2022
1 parent e7dd476 commit 0db0f1f
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 206 deletions.
246 changes: 75 additions & 171 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::hash::Hash;
use std::ops::Deref;
use std::path::PathBuf;
use std::{
Expand All @@ -17,7 +16,7 @@ use std::{

use anyhow::anyhow;
use arc_swap::ArcSwap;
use async_trait::async_trait;

use chrono::prelude::*;
use fastcrypto::traits::KeyPair;
use futures::stream::{self, Stream};
Expand All @@ -30,7 +29,6 @@ use prometheus::{
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge,
};
use tap::TapFallible;
use thiserror::Error;
use tokio::sync::{
broadcast::{self, error::RecvError},
mpsc,
Expand All @@ -46,7 +44,7 @@ use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
WorkerId as ConsensusWorkerId,
};
use narwhal_executor::{ExecutionIndices, ExecutionState};

use sui_adapter::adapter;
use sui_config::genesis::Genesis;
use sui_json_rpc_types::{SuiEventEnvelope, SuiTransactionEffects};
Expand Down Expand Up @@ -82,9 +80,11 @@ use sui_types::{
};

use crate::authority::authority_notifier::TransactionNotifierTicket;
use crate::authority::authority_store_tables::ExecutionIndicesWithHash;
use crate::checkpoints::ConsensusSender;
use crate::consensus_adapter::ConsensusListenerMessage;

use crate::consensus_handler::{
SequencedConsensusTransaction, VerifiedSequencedConsensusTransaction,
};
use crate::epoch::committee_store::CommitteeStore;
use crate::metrics::TaskUtilizationExt;
use crate::{
Expand Down Expand Up @@ -2145,10 +2145,6 @@ impl AuthorityState {
}

fn verify_narwhal_transaction(&self, certificate: &CertifiedTransaction) -> SuiResult {
let _timer = self
.metrics
.verify_narwhal_transaction_duration_mcs
.utilization_timer();
// Ensure the input is a shared object certificate. Remember that Byzantine authorities
// may input anything into consensus.
fp_ensure!(
Expand All @@ -2161,13 +2157,67 @@ impl AuthorityState {
certificate.verify(&self.committee.load())
}

/// Verifies transaction signatures and other data
/// Important: This function can potentially be called in parallel and you can not rely on order of transactions to perform verification
/// If this function return an error, transaction is skipped and is not passed to handle_consensus_transaction
/// This function returns unit error and is responsible for emitting log messages for internal errors
pub(crate) fn verify_consensus_transaction(
&self,
transaction: SequencedConsensusTransaction,
) -> Result<VerifiedSequencedConsensusTransaction, ()> {
let _timer = self
.metrics
.verify_narwhal_transaction_duration_mcs
.utilization_timer();
let committee = self.committee.load();
match &transaction.transaction.kind {
ConsensusTransactionKind::UserTransaction(certificate) => {
if self
.database
.consensus_message_processed(certificate.digest())
.expect("Storage error")
{
debug!(
consensus_index=?transaction.consensus_index,
tracking_id=?transaction.transaction.tracking_id,
tx_digest = ?certificate.digest(),
"handle_consensus_transaction UserTransaction [skip]",
);
self.metrics.skipped_consensus_txns.inc();
return Err(());
}
self.verify_narwhal_transaction(certificate)
.map_err(|err| {
warn!(
"Ignoring malformed transaction (failed to verify) from {}: {:?}",
transaction.consensus_output.certificate.header.author, err
);
})?;
}
ConsensusTransactionKind::Checkpoint(fragment) => {
fragment.verify(&committee).map_err(|err| {
warn!(
"Ignoring malformed fragment (failed to verify) from {}: {:?}",
transaction.consensus_output.certificate.header.author, err
);
})?;
}
}
Ok(VerifiedSequencedConsensusTransaction(transaction))
}

/// The transaction passed here went through verification in verify_consensus_transaction.
/// This method is called in the exact sequence message are ordered in consensus.
/// Errors returned by this call are treated as critical errors and cause node to panic.
pub(crate) async fn handle_consensus_transaction(
&self,
// TODO [2533]: use this once integrating Narwhal reconfiguration
_consensus_output: &narwhal_consensus::ConsensusOutput,
consensus_index: ExecutionIndicesWithHash,
transaction: ConsensusTransaction,
) -> Result<(), NarwhalHandlerError> {
transaction: VerifiedSequencedConsensusTransaction,
) -> SuiResult {
let VerifiedSequencedConsensusTransaction(SequencedConsensusTransaction {
consensus_output: _consensus_output,
consensus_index,
transaction,
}) = transaction;
self.metrics.total_consensus_txns.inc();
let _timer = self
.metrics
Expand All @@ -2181,26 +2231,9 @@ impl AuthorityState {
.lock()
.should_reject_consensus_transaction()
{
return Err(NarwhalHandlerError::ValidatorHalted);
}
if self
.database
.consensus_message_processed(certificate.digest())
.await
.map_err(NarwhalHandlerError::NodeError)?
{
debug!(
?consensus_index,
?tracking_id,
tx_digest = ?certificate.digest(),
"handle_consensus_transaction UserTransaction [skip]",
);
self.metrics.skipped_consensus_txns.inc();
debug!("Validator has stopped accepting consensus transactions, skipping {:?} from {:?}", certificate.digest(), consensus_index);
return Ok(());
}
self.verify_narwhal_transaction(&certificate)
.map_err(NarwhalHandlerError::SkipNarwhalTransaction)?;

debug!(
?consensus_index,
?tracking_id,
Expand All @@ -2212,14 +2245,11 @@ impl AuthorityState {
self.add_pending_certificates(vec![(
*certificate.digest(),
Some(*certificate.clone()),
)])
.map_err(NarwhalHandlerError::NodeError)?;
)])?;

self.database
.lock_shared_objects(*certificate, consensus_index)
// todo - potentially more errors from inside here needs to be mapped differently
.await
.map_err(NarwhalHandlerError::NodeError)?;
.await?;

Ok(())
}
Expand All @@ -2233,20 +2263,13 @@ impl AuthorityState {
fragment.other.authority(),
);

let committee = self.committee.load();
fragment
.verify(&committee)
.map_err(NarwhalHandlerError::SkipNarwhalTransaction)?;

let mut checkpoint = self.checkpoints.lock();
checkpoint
.handle_internal_fragment(
consensus_index.index,
*fragment,
self,
&self.committee.load(),
)
.map_err(NarwhalHandlerError::NodeError)?;
checkpoint.handle_internal_fragment(
consensus_index.index,
*fragment,
self,
&self.committee.load(),
)?;

// NOTE: The method `handle_internal_fragment` is idempotent, so we don't need
// to persist the consensus index. If the validator crashes, this transaction
Expand All @@ -2269,122 +2292,3 @@ impl AuthorityState {
}
}
}

pub struct ConsensusHandler {
state: Arc<AuthorityState>,
sender: mpsc::Sender<ConsensusListenerMessage>,
hash: Mutex<u64>,
}

impl ConsensusHandler {
pub fn new(state: Arc<AuthorityState>, sender: mpsc::Sender<ConsensusListenerMessage>) -> Self {
let hash = Mutex::new(0);
Self {
state,
sender,
hash,
}
}

fn update_hash(&self, index: ExecutionIndices, v: &[u8]) -> ExecutionIndicesWithHash {
let mut hash_guard = self
.hash
.try_lock()
.expect("Should not have contention on ExecutionState::update_hash");
let mut hasher = DefaultHasher::new();
(*hash_guard).hash(&mut hasher);
v.hash(&mut hasher);
let hash = hasher.finish();
*hash_guard = hash;
// Log hash for every certificate
if index.next_transaction_index == 1 && index.next_batch_index == 1 {
debug!(
"Integrity hash for consensus output at certificate {} is {:016x}",
index.next_certificate_index, hash
);
}
ExecutionIndicesWithHash { index, hash }
}
}

#[async_trait]
impl ExecutionState for ConsensusHandler {
/// This function will be called by Narwhal, after Narwhal sequenced this certificate.
#[instrument(level = "trace", skip_all)]
async fn handle_consensus_transaction(
&self,
// TODO [2533]: use this once integrating Narwhal reconfiguration
consensus_output: &narwhal_consensus::ConsensusOutput,
consensus_index: ExecutionIndices,
serialized_transaction: Vec<u8>,
) {
let consensus_index = self.update_hash(consensus_index, &serialized_transaction);
let transaction =
match bincode::deserialize::<ConsensusTransaction>(&serialized_transaction) {
Ok(transaction) => transaction,
Err(err) => {
warn!(
"Ignoring malformed transaction (failed to deserialize) from {}: {}",
consensus_output.certificate.header.author, err
);
return;
}
};
match self
.state
.handle_consensus_transaction(consensus_output, consensus_index, transaction)
.await
{
Ok(()) => {
if self
.sender
.send(ConsensusListenerMessage::Processed(serialized_transaction))
.await
.is_err()
{
warn!("Consensus handler outbound channel closed");
}
}
Err(NarwhalHandlerError::SkipNarwhalTransaction(err)) => {
warn!(
"Ignoring malformed transaction (failed to verify) from {}: {:?}",
consensus_output.certificate.header.author, err
);
}
Err(NarwhalHandlerError::ValidatorHalted) => {
debug!("Validator has stopped accepting consensus transactions, skipping");
}
Err(NarwhalHandlerError::NodeError(err)) => {
Err(err).expect("Unrecoverable error in consensus handler")
}
}
}

async fn load_execution_indices(&self) -> ExecutionIndices {
let index_with_hash = self
.state
.database
.last_consensus_index()
.expect("Failed to load consensus indices");
*self
.hash
.try_lock()
.expect("Should not have contention on ExecutionState::load_execution_indices") =
index_with_hash.hash;
index_with_hash.index
}
}

// todo - consider deleting this struct
#[derive(Eq, PartialEq, Clone, Debug, Error)]
pub enum NarwhalHandlerError {
/// Local node error after which it is not safe to continue consuming narwhal stream
#[error("Local node error {}", 0)]
NodeError(SuiError),
/// Transaction from narwhal did not pass verification and should be rejected,
/// narwhal can continue streaming next transactions to SUI
#[error("Invalid transaction {}", 0)]
SkipNarwhalTransaction(SuiError),
#[error("Validator halted and no longer accepts sequenced transactions")]
ValidatorHalted,
}
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
Ok(())
}

pub async fn consensus_message_processed(
pub fn consensus_message_processed(
&self,
digest: &TransactionDigest,
) -> Result<bool, SuiError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::{
use sui_types::messages_checkpoint::CheckpointRequest;
use sui_types::messages_checkpoint::CheckpointResponse;

use crate::authority::ConsensusHandler;
use crate::consensus_handler::ConsensusHandler;
use tracing::{error, info, Instrument};

#[cfg(test)]
Expand Down
Loading

0 comments on commit 0db0f1f

Please sign in to comment.