Skip to content

Commit

Permalink
[narwhal] Add CertificateV2 (MystenLabs#13777)
Browse files Browse the repository at this point in the history
Added new version for NW `Certificate` which Includes new
`AggregateSignatureState`. This holds `AggregateSignatureBytes` but with
the added layer to specify if the signature was verified via a leader,
verified directly, unverified or unsigned. This will be used to take
advantage of the certificate chain that is formed via the DAG by only
verifying the leaders of the certificate chain when they are fetched
from validators during catchup.

This is gated by a protocol feature flag so this PR should have no
effect yet. Still testing the [followup
PR](MystenLabs@ef48f56)
which will include the usage of `CertificateV2`, but sharing here to
provide more context for these changes.
  • Loading branch information
arun-koshy authored Oct 10, 2023
1 parent 8333f84 commit 6ab618a
Show file tree
Hide file tree
Showing 29 changed files with 760 additions and 197 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ narwhal-crypto.workspace = true
narwhal-executor.workspace = true
narwhal-network.workspace = true
narwhal-node.workspace = true
narwhal-test-utils.workspace = true
narwhal-types.workspace = true
narwhal-worker.workspace = true
shared-crypto.workspace = true
Expand Down Expand Up @@ -92,7 +93,6 @@ serde_yaml.workspace = true

move-symbol-pool.workspace = true

narwhal-test-utils.workspace = true
sui-test-transaction-builder.workspace = true
sui-types = { workspace = true, features = ["test-utils"] }

Expand Down
14 changes: 10 additions & 4 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use lru::LruCache;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use narwhal_config::Committee;
use narwhal_executor::{ExecutionIndices, ExecutionState};
use narwhal_types::{BatchAPI, CertificateAPI, ConsensusOutput, HeaderAPI};
use narwhal_test_utils::latest_protocol_version;
use narwhal_types::{BatchAPI, Certificate, CertificateAPI, ConsensusOutput, HeaderAPI};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeSet, HashMap, HashSet};
Expand Down Expand Up @@ -667,7 +668,7 @@ impl SequencedConsensusTransaction {
pub fn new_test(transaction: ConsensusTransaction) -> Self {
Self {
transaction: SequencedConsensusTransactionKind::External(transaction),
certificate: Default::default(),
certificate: Arc::new(Certificate::default(&latest_protocol_version())),
certificate_author: AuthorityName::ZERO,
consensus_index: Default::default(),
}
Expand Down Expand Up @@ -758,8 +759,13 @@ mod tests {
.build()
.unwrap();

let certificate =
Certificate::new_unsigned(&committee, Header::V1(header), vec![]).unwrap();
let certificate = Certificate::new_unsigned(
latest_protocol_config,
&committee,
Header::V1(header),
vec![],
)
.unwrap();

certificates.push(certificate);
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,7 @@
"loaded_child_object_format_type": false,
"loaded_child_objects_fixed": true,
"missing_type_is_compatibility_error": true,
"narwhal_certificate_v2": false,
"narwhal_new_leader_election_schedule": false,
"narwhal_versioned_metadata": false,
"no_extraneous_module_bytes": false,
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-protocol-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ struct FeatureFlags {

#[serde(skip_serializing_if = "is_false")]
enable_effects_v2: bool,

// If true, then use CertificateV2 in narwhal.
#[serde(skip_serializing_if = "is_false")]
narwhal_certificate_v2: bool,
}

fn is_false(b: &bool) -> bool {
Expand Down Expand Up @@ -952,6 +956,10 @@ impl ProtocolConfig {
pub fn enable_effects_v2(&self) -> bool {
self.feature_flags.enable_effects_v2
}

pub fn narwhal_certificate_v2(&self) -> bool {
self.feature_flags.narwhal_certificate_v2
}
}

#[cfg(not(msim))]
Expand Down
2 changes: 1 addition & 1 deletion narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn process_certificates(c: &mut Criterion) {
let rounds: Round = *size;

// process certificates for rounds, check we don't grow the dag too much
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down
32 changes: 16 additions & 16 deletions narwhal/consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn order_leaders() {
let committee = fixture.committee();
// Make certificates for rounds 1 to 7.
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn commit_one_with_leader_schedule_change() {
let committee = fixture.committee();
// Make certificates for rounds 1 to 9.
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn not_enough_support_with_leader_schedule_change() {
let committee = fixture.committee();

let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -336,7 +336,7 @@ async fn test_long_period_of_asynchrony_for_leader_schedule_change() {
let committee = fixture.committee();

let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -455,7 +455,7 @@ async fn commit_one() {
let committee = fixture.committee();
// Make certificates for rounds 1 and 2.
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -559,7 +559,7 @@ async fn dead_node() {
// remove the last authority - 4
let dead_node = ids.pop().unwrap();

let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -670,7 +670,7 @@ async fn not_enough_support() {
let mut ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
ids.sort();

let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -870,7 +870,7 @@ async fn missing_leader() {
let mut ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
ids.sort();

let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -986,7 +986,7 @@ async fn committed_round_after_restart() {
let epoch = committee.epoch();

// Make certificates for rounds 1 to 11.
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -1092,7 +1092,7 @@ async fn delayed_certificates_are_rejected() {
let gc_depth = 10;

// Make certificates for rounds 1 to 11.
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -1151,7 +1151,7 @@ async fn submitting_equivocating_certificate_should_error() {
let gc_depth = 10;

// Make certificates for rounds 1 to 11.
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -1227,7 +1227,7 @@ async fn reset_consensus_scores_on_every_schedule_change() {
let gc_depth = 10;

// Make certificates for rounds 1 to 50.
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -1341,7 +1341,7 @@ async fn restart_with_new_committee() {
tokio::spawn(async move { while rx_primary.recv().await.is_some() {} });

// Make certificates for rounds 1 and 2.
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -1426,7 +1426,7 @@ async fn garbage_collection_basic() {
.map(|authority| authority.id())
.collect();
let slow_node = ids[3];
let genesis = Certificate::genesis(&committee);
let genesis = Certificate::genesis(&latest_protocol_version(), &committee);

let slow_nodes = vec![(slow_node, 0.0_f64)];
let (certificates, _round_5_certificates) = test_utils::make_certificates_with_slow_nodes(
Expand Down Expand Up @@ -1522,7 +1522,7 @@ async fn slow_node() {
.map(|authority| authority.id())
.collect();
let slow_node = ids[3];
let genesis = Certificate::genesis(&committee);
let genesis = Certificate::genesis(&latest_protocol_version(), &committee);

let slow_nodes = vec![(slow_node, 0.0_f64)];
let (certificates, round_8_certificates) = test_utils::make_certificates_with_slow_nodes(
Expand Down Expand Up @@ -1662,7 +1662,7 @@ async fn not_enough_support_and_missing_leaders_and_gc() {
let keys_with_dead_node = ids[0..=2].to_vec();
let slow_node = ids[3];
let slow_nodes = vec![(slow_node, 0.0_f64)];
let genesis = Certificate::genesis(&committee);
let genesis = Certificate::genesis(&latest_protocol_version(), &committee);

let (mut certificates, round_2_certificates) = test_utils::make_certificates_with_slow_nodes(
&committee,
Expand Down
10 changes: 8 additions & 2 deletions narwhal/consensus/src/tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn test_consensus_recovery_with_bullshark_with_config(config: ProtocolConf

// AND make certificates for rounds 1 to 7 (inclusive)
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down Expand Up @@ -507,7 +507,13 @@ async fn test_leader_schedule_from_store() {
scores.add_score(id, score as u64);
}

let sub_dag = CommittedSubDag::new(vec![], Certificate::default(), 0, scores, None);
let sub_dag = CommittedSubDag::new(
vec![],
Certificate::default(&latest_protocol_version()),
0,
scores,
None,
);

store
.write_consensus_state(&HashMap::new(), &sub_dag)
Expand Down
2 changes: 1 addition & 1 deletion narwhal/consensus/src/tests/randomized_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ fn generate_randomised_dag(
.rng(rand)
.build();
let committee: Committee = fixture.committee();
let genesis = Certificate::genesis(&committee);
let genesis = Certificate::genesis(&latest_protocol_version(), &committee);

// Create a known DAG
let (original_certificates, _last_round) = make_certificates_with_parameters(
Expand Down
2 changes: 1 addition & 1 deletion narwhal/executor/tests/consensus_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn test_recovery() {

// Make certificates for rounds 1 and 2.
let ids: Vec<_> = fixture.authorities().map(|a| a.id()).collect();
let genesis = Certificate::genesis(&committee)
let genesis = Certificate::genesis(&latest_protocol_version(), &committee)
.iter()
.map(|x| x.digest())
.collect::<BTreeSet<_>>();
Expand Down
14 changes: 11 additions & 3 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use mysten_network::Multiaddr;
use rand::{prelude::StdRng, SeedableRng};
use serde_reflection::{Registry, Result, Samples, Tracer, TracerConfig};
use std::{fs::File, io::Write};
use test_utils::latest_protocol_version;
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, HeaderV1Builder,
MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOwnBatchMessage,
Expand Down Expand Up @@ -61,7 +62,8 @@ fn get_registry() -> Result<Registry> {

let committee = committee_builder.build();

let certificates: Vec<Certificate> = Certificate::genesis(&committee);
let certificates: Vec<Certificate> =
Certificate::genesis(&latest_protocol_version(), &committee);

// Find the author id inside the committee
let authority = committee.authority_by_key(kp.public()).unwrap();
Expand All @@ -83,10 +85,16 @@ fn get_registry() -> Result<Registry> {
.unwrap();

let worker_pk = network_keys[0].public().clone();
let certificate =
Certificate::new_unsigned(&committee, Header::V1(header.clone()), vec![]).unwrap();
let certificate = Certificate::new_unsigned(
&latest_protocol_version(),
&committee,
Header::V1(header.clone()),
vec![],
)
.unwrap();
let signature = keys[0].sign(certificate.digest().as_ref());
let certificate = Certificate::new_unsigned(
&latest_protocol_version(),
&committee,
Header::V1(header.clone()),
vec![(authority.id(), signature)],
Expand Down
21 changes: 16 additions & 5 deletions narwhal/primary/src/aggregators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crypto::{
use fastcrypto::hash::{Digest, Hash};
use std::collections::HashSet;
use std::sync::Arc;
use sui_protocol_config::ProtocolConfig;
use tracing::warn;
use types::{
ensure,
Expand All @@ -20,17 +21,19 @@ use types::{

/// Aggregates votes for a particular header into a certificate.
pub struct VotesAggregator {
protocol_config: ProtocolConfig,
weight: Stake,
votes: Vec<(AuthorityIdentifier, Signature)>,
used: HashSet<AuthorityIdentifier>,
metrics: Arc<PrimaryMetrics>,
}

impl VotesAggregator {
pub fn new(metrics: Arc<PrimaryMetrics>) -> Self {
pub fn new(protocol_config: &ProtocolConfig, metrics: Arc<PrimaryMetrics>) -> Self {
metrics.votes_received_last_round.set(0);

Self {
protocol_config: protocol_config.clone(),
weight: 0,
votes: Vec::new(),
used: HashSet::new(),
Expand Down Expand Up @@ -59,13 +62,21 @@ impl VotesAggregator {
.votes_received_last_round
.set(self.votes.len() as i64);
if self.weight >= committee.quorum_threshold() {
let cert = Certificate::new_unverified(committee, header.clone(), self.votes.clone())?;
let cert = Certificate::new_unverified(
&self.protocol_config,
committee,
header.clone(),
self.votes.clone(),
)?;
let (_, pks) = cert.signed_by(committee);

let certificate_digest: Digest<{ crypto::DIGEST_LENGTH }> = Digest::from(cert.digest());
match AggregateSignature::try_from(cert.aggregated_signature())
.map_err(|_| DagError::InvalidSignature)?
.verify_secure(&to_intent_message(certificate_digest), &pks[..])
match AggregateSignature::try_from(
cert.aggregated_signature()
.ok_or(DagError::InvalidSignature)?,
)
.map_err(|_| DagError::InvalidSignature)?
.verify_secure(&to_intent_message(certificate_digest), &pks[..])
{
Err(err) => {
warn!(
Expand Down
7 changes: 4 additions & 3 deletions narwhal/primary/src/certificate_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,14 @@ async fn process_certificates_helper(
// Use threads dedicated to computation heavy work.
spawn_blocking(move || {
let now = Instant::now();
for c in &certs {
sync.sanitize_certificate(c)?;
let mut sanitized_certs = Vec::new();
for c in certs {
sanitized_certs.push(sync.sanitize_certificate(c)?);
}
metrics
.certificate_fetcher_total_verification_us
.inc_by(now.elapsed().as_micros() as u64);
Ok::<Vec<Certificate>, DagError>(certs)
Ok::<Vec<Certificate>, DagError>(sanitized_certs)
})
})
.collect_vec();
Expand Down
Loading

0 comments on commit 6ab618a

Please sign in to comment.