Skip to content

Commit

Permalink
[Narwhal] speed up batch processing of fetched certificates (MystenLa…
Browse files Browse the repository at this point in the history
…bs#14529)

## Description 

Currently, certificates fetched during catchup are processed and
accepted one-by-one, same as normal certificate handling. This turns out
to be a throughput bottleneck for catching up. This PR includes one
major and a few minor changes to improve the catchup throughput.

Main change: take the lock on `state` once per batch of certificates in
`process_certificate_with_lock()`, instead of once per certificate.
`tokio::sync::Mutex` seems to be really slow as the lock on `state`,
taking 5 ~ 10s for 2000 lock operations.

Other changes:
- Increase channel size in Narwhal across the board from 1k to 10k, to
avoid filling the channels too often. I did not see very noticeable
memory usage increase.
- Avoid using blocking thread for verifying user signatures in
transactions, which should be relatively fast.
- Cleanups.

## Test Plan 

### Private testnet

In catchup experiments with 5000 TPS and 150 validators, this seems to
improve catchup speed from ~2/round to ~8/round.

Before:
Catching up after 1 hr of downtime never finished within the epoch:


![image](https://github.com/MystenLabs/sui/assets/81660174/bcdbd727-12f7-46dc-944a-b13f68cfbf10)


![image](https://github.com/MystenLabs/sui/assets/81660174/1933e6a4-e51b-4fcf-8df6-506cba67d6cd)

After:
Catching up after 1 hr of downtime took ~20 min:


![image](https://github.com/MystenLabs/sui/assets/81660174/7dd85846-5074-49a3-b85e-fa6213aebd30)


![image](https://github.com/MystenLabs/sui/assets/81660174/89725256-fb8e-4817-aed5-609d241c57d0)

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Nov 1, 2023
1 parent a44a567 commit 2761c67
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 346 deletions.
17 changes: 6 additions & 11 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use narwhal_types::{validate_batch_version, BatchAPI};
use narwhal_worker::TransactionValidator;
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
use tap::TapFallible;
use tokio::runtime::Handle;
use tracing::{info, warn};

/// Allows verifying the validity of transactions
Expand Down Expand Up @@ -105,16 +104,12 @@ impl TransactionValidator for SuiTxValidator {
// verify the certificate signatures as a batch
let cert_count = cert_batch.len();
let ckpt_count = ckpt_batch.len();
let epoch_store = self.epoch_store.clone();
Handle::current()
.spawn_blocking(move || {
epoch_store
.signature_verifier
.verify_certs_and_checkpoints(cert_batch, ckpt_batch)
.tap_err(|e| warn!("batch verification error: {}", e))
.wrap_err("Malformed batch (failed to verify)")
})
.await??;

self.epoch_store
.signature_verifier
.verify_certs_and_checkpoints(cert_batch, ckpt_batch)
.tap_err(|e| warn!("batch verification error: {}", e))
.wrap_err("Malformed batch (failed to verify)")?;

// All checkpoint sigs have been verified, forward them to the checkpoint service
for ckpt in ckpt_messages {
Expand Down
8 changes: 3 additions & 5 deletions narwhal/node/src/primary_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ struct PrimaryNodeInner {
}

impl PrimaryNodeInner {
/// The default channel capacity.
pub const CHANNEL_CAPACITY: usize = 1_000;
/// The window where the schedule change takes place in consensus. It represents number
/// of committed sub dags.
/// TODO: move this to node properties
Expand Down Expand Up @@ -212,15 +210,15 @@ impl PrimaryNodeInner {
)
.unwrap();
let (tx_new_certificates, rx_new_certificates) =
metered_channel::channel(Self::CHANNEL_CAPACITY, &new_certificates_counter);
metered_channel::channel(primary::CHANNEL_CAPACITY, &new_certificates_counter);

let committed_certificates_counter = IntGauge::new(
PrimaryChannelMetrics::NAME_COMMITTED_CERTS,
PrimaryChannelMetrics::DESC_COMMITTED_CERTS,
)
.unwrap();
let (tx_committed_certificates, rx_committed_certificates) =
metered_channel::channel(Self::CHANNEL_CAPACITY, &committed_certificates_counter);
metered_channel::channel(primary::CHANNEL_CAPACITY, &committed_certificates_counter);

// Compute the public key of this authority.
let name = keypair.public().clone();
Expand Down Expand Up @@ -307,7 +305,7 @@ impl PrimaryNodeInner {
let channel_metrics = ChannelMetrics::new(registry);

let (tx_sequence, rx_sequence) =
metered_channel::channel(Self::CHANNEL_CAPACITY, &channel_metrics.tx_sequence);
metered_channel::channel(primary::CHANNEL_CAPACITY, &channel_metrics.tx_sequence);

// Check for any sub-dags that have been sent by consensus but were not processed by the executor.
let restored_consensus_output = get_restored_consensus_output(
Expand Down
166 changes: 25 additions & 141 deletions narwhal/primary/src/certificate_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,30 @@ use crate::{metrics::PrimaryMetrics, synchronizer::Synchronizer};
use anemo::Request;
use config::{AuthorityIdentifier, Committee};
use crypto::NetworkPublicKey;
use fastcrypto::hash::Hash;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use mysten_metrics::metered_channel::Receiver;
use mysten_metrics::{monitored_future, monitored_scope, spawn_logged_monitored_task};
use network::PrimaryToPrimaryRpc;
use rand::{rngs::ThreadRng, seq::SliceRandom};
use std::collections::HashSet;
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration,
};
use storage::CertificateStore;
use sui_protocol_config::ProtocolConfig;
use tokio::task::{spawn_blocking, JoinSet};
use tokio::{
sync::watch,
task::JoinHandle,
task::{spawn_blocking, JoinHandle, JoinSet},
time::{sleep, timeout, Instant},
};
use tracing::{debug, error, instrument, trace, warn};
use types::CertificateDigest;
use types::{
error::{DagError, DagResult},
validate_received_certificate_version, Certificate, CertificateAPI,
ConditionalBroadcastReceiver, FetchCertificatesRequest, FetchCertificatesResponse, HeaderAPI,
Round, SignatureVerificationState,
Round,
};

#[cfg(test)]
Expand All @@ -51,13 +47,6 @@ const PARALLEL_FETCH_REQUEST_ADDITIONAL_TIMEOUT: Duration = Duration::from_secs(
// Batch size is chosen so that verifying a batch takes non-trival
// time (verifying a batch of 200 certificates should take > 100ms).
const VERIFY_CERTIFICATES_BATCH_SIZE: usize = 200;
// Number of certificates to verify in a batch. Verifications in each batch run serially.
// Batch size is chosen so that verifying a batch takes non-trival
// time (verifying a batch of 50 certificates should take > 25ms).
const VERIFY_CERTIFICATES_V2_BATCH_SIZE: usize = 50;
// Number of rounds to force verfication of certificates by signature, to bound the maximum
// number of certificates with bad signatures in storage.
const CERTIFICATE_VERIFICATION_ROUND_INTERVAL: u64 = 50;

#[derive(Clone, Debug)]
pub enum CertificateFetcherCommand {
Expand Down Expand Up @@ -465,44 +454,46 @@ async fn process_certificates_helper(
MAX_CERTIFICATES_TO_FETCH,
));
}

// We should not be getting mixed versions of certificates from a
// validator, so any individual certificate with mismatched versions
// should cancel processing for the entire batch of fetched certificates.
let certificates = response
.certificates
.into_iter()
.map(|cert| {
validate_received_certificate_version(cert, protocol_config).map_err(|err| {
error!("fetched certficate processing error: {err}");
DagError::InvalidCertificateVersion
})
})
.collect::<DagResult<Vec<Certificate>>>()?;

// In PrimaryReceiverHandler, certificates already in storage are ignored.
// The check is unnecessary here, because there is no concurrent processing of older
// certificates. For byzantine failures, the check will not be effective anyway.
let _verify_scope = monitored_scope("VerifyingFetchedCertificates");
let _scope = monitored_scope("ProcessingFetchedCertificates");

// Verify certificates in parallel. If we are using CertificateV2 only verify
// the tip of a certificate chain and verify the parent certificates of that tip
// indirectly.
if protocol_config.narwhal_certificate_v2() {
process_certificates_v2_helper(protocol_config, response, synchronizer, metrics).await?;
synchronizer
.try_accept_fetched_certificates(certificates)
.await?;
} else {
process_certificates_v1_helper(protocol_config, response, synchronizer, metrics).await?;
process_certificates_v1_helper(certificates, synchronizer, metrics).await?;
}

trace!("Fetched certificates have been processed");

Ok(())
}

// Verify certificates in parallel.
async fn process_certificates_v1_helper(
protocol_config: &ProtocolConfig,
response: FetchCertificatesResponse,
certificates: Vec<Certificate>,
synchronizer: &Synchronizer,
metrics: Arc<PrimaryMetrics>,
) -> DagResult<()> {
let mut all_certificates = vec![];
for cert in response.certificates {
// We should not be getting mixed versions of certificates from a
// validator, so any individual certificate with mismatched versions
// should cancel processing for the entire batch of fetched certificates.
all_certificates.push(
validate_received_certificate_version(cert, protocol_config).map_err(|err| {
error!("fetched certficate processing error: {err}");
DagError::InvalidCertificateVersion
})?,
);
}
let verify_tasks = all_certificates
let verify_tasks = certificates
.chunks(VERIFY_CERTIFICATES_BATCH_SIZE)
.map(|certs| {
let certs = certs.to_vec();
Expand Down Expand Up @@ -540,110 +531,3 @@ async fn process_certificates_v1_helper(

Ok(())
}

async fn process_certificates_v2_helper(
protocol_config: &ProtocolConfig,
response: FetchCertificatesResponse,
synchronizer: &Synchronizer,
metrics: Arc<PrimaryMetrics>,
) -> DagResult<()> {
let mut all_certificates = vec![];
let mut all_parents = HashSet::<CertificateDigest>::new();
for cert in response.certificates {
all_parents.extend(cert.header().parents().iter());
// We should not be getting mixed versions of certificates from a
// validator, so any individual certificate with mismatched versions
// should cancel processing for the entire batch of fetched certificates.
all_certificates.push(
validate_received_certificate_version(cert, protocol_config).map_err(|err| {
error!("fetched certficate processing error: {err}");
DagError::InvalidCertificateVersion
})?,
);
}

let all_certificates_count = all_certificates.len() as u64;

// Identify leaf certs and preemptively set the parent certificates
// as verified indirectly. This is safe because any leaf certs that
// fail verification will cancel processing for all fetched certs.
let mut direct_verification_certs = Vec::new();
for (idx, c) in all_certificates.iter_mut().enumerate() {
if !all_parents.contains(&c.digest()) {
direct_verification_certs.push((idx, c.clone()));
continue;
}
if c.header().round() % CERTIFICATE_VERIFICATION_ROUND_INTERVAL == 0 {
direct_verification_certs.push((idx, c.clone()));
continue;
}
// TODO: add dedicated Certificate API for VerifiedIndirectly.
c.set_signature_verification_state(SignatureVerificationState::VerifiedIndirectly(
c.aggregated_signature()
.ok_or(DagError::InvalidSignature)?
.clone(),
));
}
let direct_verification_count = direct_verification_certs.len() as u64;

// Create verify tasks only for leaf certs as parent certs can skip this completely.
let verify_tasks = direct_verification_certs
.chunks(VERIFY_CERTIFICATES_V2_BATCH_SIZE)
.map(|chunk| {
let certs = chunk.to_vec();
let sync = synchronizer.clone();
let metrics = metrics.clone();
spawn_blocking(move || {
let now = Instant::now();
let mut sanitized_certs = Vec::new();
for (idx, c) in certs {
sanitized_certs.push((idx, sync.sanitize_certificate(c)?));
}
metrics
.certificate_fetcher_total_verification_us
.inc_by(now.elapsed().as_micros() as u64);
Ok::<Vec<(usize, Certificate)>, DagError>(sanitized_certs)
})
})
.collect_vec();

// We ensure sanitization of certificates completes for all leaves
// fetched certificates before accepting any certficates.
for task in verify_tasks.into_iter() {
// Any certificates that fail to be verified should cancel the entire
// batch of fetched certficates.
let idx_and_certs = task.await.map_err(|err| {
error!("Cancelling due to {err:?}");
DagError::Canceled
})??;
for (idx, cert) in idx_and_certs {
all_certificates[idx] = cert;
}
}

metrics
.fetched_certificates_verified_directly
.inc_by(direct_verification_count);
metrics
.fetched_certificates_verified_indirectly
.inc_by(all_certificates_count.saturating_sub(direct_verification_count));

// Accept verified certificates in the same order as received.
for cert in all_certificates {
let cert_digest = cert.digest();
let now = Instant::now();
if let Err(e) = synchronizer.try_accept_fetched_certificate(cert).await {
// It is possible that subsequent certificates are useful,
// so not stopping early.
warn!(
"Failed to accept fetched certificate {:?}: {e}",
cert_digest
);
}
metrics
.certificate_fetcher_total_accept_us
.inc_by(now.elapsed().as_micros() as u64);
}

Ok(())
}
7 changes: 4 additions & 3 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use types::{
pub mod primary_tests;

/// The default channel capacity for each channel of the primary.
pub const CHANNEL_CAPACITY: usize = 1_000;
pub const CHANNEL_CAPACITY: usize = 10_000;

/// The number of shutdown receivers to create on startup. We need one per component loop.
pub const NUM_SHUTDOWN_RECEIVERS: u64 = 27;
Expand Down Expand Up @@ -841,8 +841,9 @@ impl PrimaryReceiverHandler {
Ok(())
}

/// Gets parent certificate digests not known before, in storage, among suspended certificates,
/// or being requested from other header proposers.
/// Gets parent certificate digests not known before.
/// Digests that are in storage, suspended, or being requested from other proposers
/// are considered to be known.
async fn get_unknown_parent_digests(
&self,
header: &Header,
Expand Down
22 changes: 6 additions & 16 deletions narwhal/primary/src/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::consensus::LeaderSchedule;
use crate::metrics::PrimaryMetrics;
use config::{AuthorityIdentifier, Committee, Epoch, WorkerId};
use config::{AuthorityIdentifier, Committee, WorkerId};
use fastcrypto::hash::Hash as _;
use mysten_metrics::metered_channel::{Receiver, Sender};
use mysten_metrics::spawn_logged_monitored_task;
Expand Down Expand Up @@ -66,7 +66,7 @@ pub struct Proposer {
/// Receiver for shutdown.
rx_shutdown: ConditionalBroadcastReceiver,
/// Receives the parents to include in the next header (along with their round number) from core.
rx_parents: Receiver<(Vec<Certificate>, Round, Epoch)>,
rx_parents: Receiver<(Vec<Certificate>, Round)>,
/// Receives the batches' digests from our workers.
rx_our_digests: Receiver<OurDigestMessage>,
/// Receives system messages to include in the next header.
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Proposer {
min_header_delay: Duration,
header_resend_timeout: Option<Duration>,
rx_shutdown: ConditionalBroadcastReceiver,
rx_parents: Receiver<(Vec<Certificate>, Round, Epoch)>,
rx_parents: Receiver<(Vec<Certificate>, Round)>,
rx_our_digests: Receiver<OurDigestMessage>,
rx_system_messages: Receiver<SystemMessage>,
tx_headers: Sender<Header>,
Expand Down Expand Up @@ -637,23 +637,13 @@ impl Proposer {
}
},

Some((parents, round, epoch)) = self.rx_parents.recv() => {
Some((parents, round)) = self.rx_parents.recv() => {
debug!("Proposer received parents, round={} parent.round={} num_parents={}", self.round, round, parents.len());

// If the core already moved to the next epoch we should pull the next
// committee as well.

match epoch.cmp(&self.committee.epoch()) {
Ordering::Equal => {
// we can proceed.
}
_ => continue
}

// Sanity check: verify provided certs are of the correct round & epoch.
for parent in parents.iter() {
if parent.round() != round || parent.epoch() != epoch {
error!("Proposer received certificate {parent:?} that failed to match expected round {round} or epoch {epoch}. This should not be possible.");
if parent.round() != round {
error!("Proposer received certificate {parent:?} that failed to match expected round {round}. This should not be possible.");
}
}

Expand Down
Loading

0 comments on commit 2761c67

Please sign in to comment.