Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ConsensusHandler in preparation for parallel signature verification #5344

Merged
merged 1 commit into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 75 additions & 171 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::hash::Hash;
use std::ops::Deref;
use std::path::PathBuf;
use std::{
Expand All @@ -17,7 +16,7 @@ use std::{

use anyhow::anyhow;
use arc_swap::ArcSwap;
use async_trait::async_trait;

use chrono::prelude::*;
use fastcrypto::traits::KeyPair;
use futures::stream::{self, Stream};
Expand All @@ -30,7 +29,6 @@ use prometheus::{
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge,
};
use tap::TapFallible;
use thiserror::Error;
use tokio::sync::{
broadcast::{self, error::RecvError},
mpsc,
Expand All @@ -46,7 +44,7 @@ use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
WorkerId as ConsensusWorkerId,
};
use narwhal_executor::{ExecutionIndices, ExecutionState};

use sui_adapter::adapter;
use sui_config::genesis::Genesis;
use sui_json_rpc_types::{SuiEventEnvelope, SuiTransactionEffects};
Expand Down Expand Up @@ -82,9 +80,11 @@ use sui_types::{
};

use crate::authority::authority_notifier::TransactionNotifierTicket;
use crate::authority::authority_store_tables::ExecutionIndicesWithHash;
use crate::checkpoints::ConsensusSender;
use crate::consensus_adapter::ConsensusListenerMessage;

use crate::consensus_handler::{
SequencedConsensusTransaction, VerifiedSequencedConsensusTransaction,
};
use crate::epoch::committee_store::CommitteeStore;
use crate::metrics::TaskUtilizationExt;
use crate::{
Expand Down Expand Up @@ -2145,10 +2145,6 @@ impl AuthorityState {
}

fn verify_narwhal_transaction(&self, certificate: &CertifiedTransaction) -> SuiResult {
let _timer = self
.metrics
.verify_narwhal_transaction_duration_mcs
.utilization_timer();
// Ensure the input is a shared object certificate. Remember that Byzantine authorities
// may input anything into consensus.
fp_ensure!(
Expand All @@ -2161,13 +2157,67 @@ impl AuthorityState {
certificate.verify(&self.committee.load())
}

/// Verifies transaction signatures and other data
/// Important: This function can potentially be called in parallel and you can not rely on order of transactions to perform verification
/// If this function return an error, transaction is skipped and is not passed to handle_consensus_transaction
/// This function returns unit error and is responsible for emitting log messages for internal errors
pub(crate) fn verify_consensus_transaction(
&self,
transaction: SequencedConsensusTransaction,
) -> Result<VerifiedSequencedConsensusTransaction, ()> {
let _timer = self
.metrics
.verify_narwhal_transaction_duration_mcs
.utilization_timer();
let committee = self.committee.load();
match &transaction.transaction.kind {
ConsensusTransactionKind::UserTransaction(certificate) => {
if self
.database
.consensus_message_processed(certificate.digest())
.expect("Storage error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some comments about why we panic upon this storage error?

Copy link
Contributor

@asonnino asonnino Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure: panicking here ensures that the consensus transaction will not be skipped (ie. validator stops/crashes and the execution indices do not update with future transactions), right? (because we cannot skip that transaction)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general when we have storage error there is not much we can do. Yes, this will not skip transaction, but essentially will shutdown the node

{
debug!(
consensus_index=?transaction.consensus_index,
tracking_id=?transaction.transaction.tracking_id,
tx_digest = ?certificate.digest(),
"handle_consensus_transaction UserTransaction [skip]",
);
self.metrics.skipped_consensus_txns.inc();
return Err(());
}
self.verify_narwhal_transaction(certificate)
.map_err(|err| {
warn!(
"Ignoring malformed transaction (failed to verify) from {}: {:?}",
transaction.consensus_output.certificate.header.author, err
);
})?;
}
ConsensusTransactionKind::Checkpoint(fragment) => {
fragment.verify(&committee).map_err(|err| {
warn!(
"Ignoring malformed fragment (failed to verify) from {}: {:?}",
transaction.consensus_output.certificate.header.author, err
);
})?;
}
}
Ok(VerifiedSequencedConsensusTransaction(transaction))
}

/// The transaction passed here went through verification in verify_consensus_transaction.
/// This method is called in the exact sequence message are ordered in consensus.
/// Errors returned by this call are treated as critical errors and cause node to panic.
pub(crate) async fn handle_consensus_transaction(
&self,
// TODO [2533]: use this once integrating Narwhal reconfiguration
_consensus_output: &narwhal_consensus::ConsensusOutput,
consensus_index: ExecutionIndicesWithHash,
transaction: ConsensusTransaction,
) -> Result<(), NarwhalHandlerError> {
transaction: VerifiedSequencedConsensusTransaction,
) -> SuiResult {
let VerifiedSequencedConsensusTransaction(SequencedConsensusTransaction {
consensus_output: _consensus_output,
consensus_index,
transaction,
}) = transaction;
self.metrics.total_consensus_txns.inc();
let _timer = self
.metrics
Expand All @@ -2181,26 +2231,9 @@ impl AuthorityState {
.lock()
.should_reject_consensus_transaction()
{
return Err(NarwhalHandlerError::ValidatorHalted);
}
if self
.database
.consensus_message_processed(certificate.digest())
.await
.map_err(NarwhalHandlerError::NodeError)?
{
debug!(
?consensus_index,
?tracking_id,
tx_digest = ?certificate.digest(),
"handle_consensus_transaction UserTransaction [skip]",
);
self.metrics.skipped_consensus_txns.inc();
debug!("Validator has stopped accepting consensus transactions, skipping {:?} from {:?}", certificate.digest(), consensus_index);
return Ok(());
}
self.verify_narwhal_transaction(&certificate)
.map_err(NarwhalHandlerError::SkipNarwhalTransaction)?;

debug!(
?consensus_index,
?tracking_id,
Expand All @@ -2212,14 +2245,11 @@ impl AuthorityState {
self.add_pending_certificates(vec![(
*certificate.digest(),
Some(*certificate.clone()),
)])
.map_err(NarwhalHandlerError::NodeError)?;
)])?;

self.database
.lock_shared_objects(*certificate, consensus_index)
// todo - potentially more errors from inside here needs to be mapped differently
.await
.map_err(NarwhalHandlerError::NodeError)?;
.await?;

Ok(())
}
Expand All @@ -2233,20 +2263,13 @@ impl AuthorityState {
fragment.other.authority(),
);

let committee = self.committee.load();
fragment
.verify(&committee)
.map_err(NarwhalHandlerError::SkipNarwhalTransaction)?;

let mut checkpoint = self.checkpoints.lock();
checkpoint
.handle_internal_fragment(
consensus_index.index,
*fragment,
self,
&self.committee.load(),
)
.map_err(NarwhalHandlerError::NodeError)?;
checkpoint.handle_internal_fragment(
consensus_index.index,
*fragment,
self,
&self.committee.load(),
)?;

// NOTE: The method `handle_internal_fragment` is idempotent, so we don't need
// to persist the consensus index. If the validator crashes, this transaction
Expand All @@ -2269,122 +2292,3 @@ impl AuthorityState {
}
}
}

pub struct ConsensusHandler {
state: Arc<AuthorityState>,
sender: mpsc::Sender<ConsensusListenerMessage>,
hash: Mutex<u64>,
}

impl ConsensusHandler {
pub fn new(state: Arc<AuthorityState>, sender: mpsc::Sender<ConsensusListenerMessage>) -> Self {
let hash = Mutex::new(0);
Self {
state,
sender,
hash,
}
}

fn update_hash(&self, index: ExecutionIndices, v: &[u8]) -> ExecutionIndicesWithHash {
let mut hash_guard = self
.hash
.try_lock()
.expect("Should not have contention on ExecutionState::update_hash");
let mut hasher = DefaultHasher::new();
(*hash_guard).hash(&mut hasher);
v.hash(&mut hasher);
let hash = hasher.finish();
*hash_guard = hash;
// Log hash for every certificate
if index.next_transaction_index == 1 && index.next_batch_index == 1 {
debug!(
"Integrity hash for consensus output at certificate {} is {:016x}",
index.next_certificate_index, hash
);
}
ExecutionIndicesWithHash { index, hash }
}
}

#[async_trait]
impl ExecutionState for ConsensusHandler {
/// This function will be called by Narwhal, after Narwhal sequenced this certificate.
#[instrument(level = "trace", skip_all)]
async fn handle_consensus_transaction(
&self,
// TODO [2533]: use this once integrating Narwhal reconfiguration
consensus_output: &narwhal_consensus::ConsensusOutput,
consensus_index: ExecutionIndices,
serialized_transaction: Vec<u8>,
) {
let consensus_index = self.update_hash(consensus_index, &serialized_transaction);
let transaction =
match bincode::deserialize::<ConsensusTransaction>(&serialized_transaction) {
Ok(transaction) => transaction,
Err(err) => {
warn!(
"Ignoring malformed transaction (failed to deserialize) from {}: {}",
consensus_output.certificate.header.author, err
);
return;
}
};
match self
.state
.handle_consensus_transaction(consensus_output, consensus_index, transaction)
.await
{
Ok(()) => {
if self
.sender
.send(ConsensusListenerMessage::Processed(serialized_transaction))
.await
.is_err()
{
warn!("Consensus handler outbound channel closed");
}
}
Err(NarwhalHandlerError::SkipNarwhalTransaction(err)) => {
warn!(
"Ignoring malformed transaction (failed to verify) from {}: {:?}",
consensus_output.certificate.header.author, err
);
}
Err(NarwhalHandlerError::ValidatorHalted) => {
debug!("Validator has stopped accepting consensus transactions, skipping");
}
Err(NarwhalHandlerError::NodeError(err)) => {
Err(err).expect("Unrecoverable error in consensus handler")
}
}
}

async fn load_execution_indices(&self) -> ExecutionIndices {
let index_with_hash = self
.state
.database
.last_consensus_index()
.expect("Failed to load consensus indices");
*self
.hash
.try_lock()
.expect("Should not have contention on ExecutionState::load_execution_indices") =
index_with_hash.hash;
index_with_hash.index
}
}

// todo - consider deleting this struct
#[derive(Eq, PartialEq, Clone, Debug, Error)]
pub enum NarwhalHandlerError {
/// Local node error after which it is not safe to continue consuming narwhal stream
#[error("Local node error {}", 0)]
NodeError(SuiError),
/// Transaction from narwhal did not pass verification and should be rejected,
/// narwhal can continue streaming next transactions to SUI
#[error("Invalid transaction {}", 0)]
SkipNarwhalTransaction(SuiError),
#[error("Validator halted and no longer accepts sequenced transactions")]
ValidatorHalted,
}
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
Ok(())
}

pub async fn consensus_message_processed(
pub fn consensus_message_processed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch 👍

&self,
digest: &TransactionDigest,
) -> Result<bool, SuiError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::{
use sui_types::messages_checkpoint::CheckpointRequest;
use sui_types::messages_checkpoint::CheckpointResponse;

use crate::authority::ConsensusHandler;
use crate::consensus_handler::ConsensusHandler;
use tracing::{error, info, Instrument};

#[cfg(test)]
Expand Down
Loading