Skip to content

Commit

Permalink
feat: remove the id from serialized forms of the Header
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed Nov 1, 2022
1 parent 9ad4d36 commit 9466260
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 104 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions narwhal/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ eyre = "0.6.8"

anemo.workspace = true
reqwest = { version = "0.11.12", features = ["json"] }
once_cell = "1.16.0"

[dev-dependencies]
hex = "0.4.3"
Expand Down
26 changes: 11 additions & 15 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use config::{Authority, Committee, Epoch, WorkerIndex, WorkerInfo};
use crypto::{KeyPair, NetworkKeyPair};
use fastcrypto::{
hash::{Digest, Hash},
hash::Hash,
traits::{KeyPair as _, Signer},
};
use multiaddr::Multiaddr;
Expand All @@ -12,7 +12,7 @@ use serde_reflection::{Registry, Result, Samples, Tracer, TracerConfig};
use std::{fs::File, io::Write};
use structopt::{clap::arg_enum, StructOpt};
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, Metadata,
Batch, BatchDigest, Certificate, CertificateDigest, HeaderBuilder, HeaderDigest, Metadata,
ReconfigureNotification, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerReconfigureMessage, WorkerSynchronizeMessage,
};
Expand Down Expand Up @@ -71,20 +71,16 @@ fn get_registry() -> Result<Registry> {
let certificates: Vec<Certificate> = Certificate::genesis(&committee);

// The values have to be "complete" in a data-centric sense, but not "correct" cryptographically.
let mut header = Header {
author: kp.public().clone(),
round: 1,
payload: (0..4u32).map(|wid| (BatchDigest([0u8; 32]), wid)).collect(),
parents: certificates.iter().map(|x| x.digest()).collect(),
..Header::default()
};
let header_builder = HeaderBuilder::default();
let header = header_builder
.author(kp.public().clone())
.epoch(0)
.round(1)
.payload((0..4u32).map(|wid| (BatchDigest([0u8; 32]), wid)).collect())
.parents(certificates.iter().map(|x| x.digest()).collect())
.build(&kp)
.unwrap();

let header_digest = header.digest();
header = Header {
id: header_digest,
signature: kp.sign(Digest::from(header_digest).as_ref()),
..header
};
let worker_pk = network_keys[0].public().clone();
let certificate = Certificate::new_unsigned(&committee, header.clone(), vec![]).unwrap();
let signature = keys[0].sign(certificate.digest().as_ref());
Expand Down
3 changes: 1 addition & 2 deletions narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ Header:
- parents:
SEQ:
TYPENAME: CertificateDigest
- id:
TYPENAME: HeaderDigest
- signature:
TYPENAME: BLS12381Signature
- metadata:
Expand Down Expand Up @@ -117,3 +115,4 @@ WorkerSynchronizeMessage:
SEQ:
TYPENAME: BatchDigest
- target: STR

2 changes: 1 addition & 1 deletion narwhal/primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl BlockRemover {
certificates: Vec<Certificate>,
batches_by_worker: HashMap<WorkerId, Vec<BatchDigest>>,
) -> Result<(), Either<TypedStoreError, ValidatorDagError>> {
let header_ids: Vec<HeaderDigest> = certificates.iter().map(|c| c.header.id).collect();
let header_ids: Vec<HeaderDigest> = certificates.iter().map(|c| c.header.id()).collect();

self.header_store
.remove_all(header_ids)
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ impl BlockSynchronizer {
}
}

#[instrument(level = "trace", skip_all, fields(request_id, certificate=?certificate.header.id))]
#[instrument(level = "trace", skip_all, fields(request_id, certificate=?certificate.header.id()))]
async fn wait_for_block_payload<'a>(
payload_synchronize_timeout: Duration,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
Expand Down
20 changes: 10 additions & 10 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl Core {
.processing
.entry(header.round)
.or_insert_with(HashSet::new)
.insert(header.id);
.insert(header.id());

if inserted {
// Only increase the metric when the header has been seen for the first
Expand Down Expand Up @@ -270,7 +270,7 @@ impl Core {
.headers_suspended
.with_label_values(&[&header.epoch.to_string(), "missing_parents"])
.inc();
debug!("Processing of {} suspended: missing parent(s)", header.id);
debug!("Processing of {} suspended: missing parent(s)", header.id());
return Ok(());
}

Expand All @@ -279,13 +279,13 @@ impl Core {
for x in parents {
ensure!(
x.round() + 1 == header.round,
DagError::MalformedHeader(header.id)
DagError::MalformedHeader(header.id())
);
stake += self.committee.stake(&x.origin());
}
ensure!(
stake >= self.committee.quorum_threshold(),
DagError::HeaderRequiresQuorum(header.id)
DagError::HeaderRequiresQuorum(header.id())
);

// Ensure we have the payload. If we don't, the synchronizer will ask our workers to get it, and then
Expand All @@ -301,7 +301,7 @@ impl Core {

// Store the header.
self.header_store
.async_write(header.id, header.clone())
.async_write(header.id(), header.clone())
.await;

self.metrics
Expand Down Expand Up @@ -500,7 +500,7 @@ impl Core {
if !self
.processing
.get(&certificate.header.round)
.map_or_else(|| false, |x| x.contains(&certificate.header.id))
.map_or_else(|| false, |x| x.contains(&certificate.header.id()))
{
// This function may still throw an error if the storage fails.
self.process_header_internal(&certificate.header, /* signed */ true)
Expand Down Expand Up @@ -540,7 +540,7 @@ impl Core {
.await?;

// Send it to the consensus layer.
let id = certificate.header.id;
let id = certificate.header.id();
if let Err(e) = self.tx_new_certificates.send(certificate).await {
warn!(
"Failed to deliver certificate {} to the consensus: {}",
Expand Down Expand Up @@ -596,14 +596,14 @@ impl Core {
// in this node's DAG.
ensure!(
self.gc_round < header.round,
DagError::TooOld(header.id.into(), header.round, self.gc_round)
DagError::TooOld(header.id().into(), header.round, self.gc_round)
);
// TODO: enable below.
// The header round is too high for this node, which is unlikely to acquire all
// parent certificates in time.
// ensure!(
// self.highest_processed_round + MAX_HEADER_ROUND_CATCHUP_THRESHOLD > header.round,
// DagError::TooNew(header.id.into(), header.round, self.gc_round)
// DagError::TooNew(header.id().into(), header.round, self.gc_round)
// );

// Verify the header's signature.
Expand Down Expand Up @@ -632,7 +632,7 @@ impl Core {

// Ensure we receive a vote on the expected header.
ensure!(
vote.id == self.current_header.id
vote.id == self.current_header.id()
&& vote.origin == self.current_header.author
&& vote.round == self.current_header.round,
DagError::UnexpectedVote(vote.id)
Expand Down
6 changes: 3 additions & 3 deletions narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl HeaderWaiter {
match message {
WaiterMessage::SyncBatches(missing, header) => {
debug!("Synching the payload of {header}");
let header_id = header.id;
let header_id = header.id();
let round = header.round;
let author = header.author.clone();

Expand Down Expand Up @@ -246,7 +246,7 @@ impl HeaderWaiter {

WaiterMessage::SyncParents(missing, header) => {
debug!("Synching the parents of {header}");
let header_id = header.id;
let header_id = header.id();
let round = header.round;
let author = header.author.clone();

Expand Down Expand Up @@ -307,7 +307,7 @@ impl HeaderWaiter {
Ok(header) => header,
};
if let Some(header) = header {
if let Some((_, tx_cancel)) = self.pending.remove(&header.id) {
if let Some((_, tx_cancel)) = self.pending.remove(&header.id()) {
let _ = tx_cancel.send(());
}
for x in header.payload.keys() {
Expand Down
8 changes: 4 additions & 4 deletions narwhal/primary/src/tests/block_remover_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ async fn test_successful_blocks_delete() {

// write the header
header_store
.async_write(header.clone().id, header.clone())
.async_write(header.clone().id(), header.clone())
.await;

header_ids.push(header.clone().id);
header_ids.push(header.clone().id());

// write the batches to payload store
payload_store
Expand Down Expand Up @@ -239,10 +239,10 @@ async fn test_failed_blocks_delete() {

// write the header
header_store
.async_write(header.clone().id, header.clone())
.async_write(header.clone().id(), header.clone())
.await;

header_ids.push(header.clone().id);
header_ids.push(header.clone().id());

// write the batches to payload store
payload_store
Expand Down
37 changes: 30 additions & 7 deletions narwhal/primary/src/tests/certificate_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use crate::{
};
use anemo::async_trait;
use anyhow::Result;
use config::Committee;
use crypto::PublicKey;
use config::{Committee, Epoch, WorkerId};
use crypto::{PublicKey, Signature};
use fastcrypto::{hash::Hash, traits::KeyPair, SignatureService};
use indexmap::IndexMap;
use itertools::Itertools;
use network::P2pNetwork;
use node::NodeStorage;
use once_cell::sync::OnceCell;
use prometheus::Registry;
use std::{
collections::{BTreeSet, HashMap},
Expand All @@ -28,10 +30,10 @@ use tokio::{
time::sleep,
};
use types::{
Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse,
PayloadAvailabilityRequest, PayloadAvailabilityResponse, PrimaryMessage, PrimaryToPrimary,
PrimaryToPrimaryServer, ReconfigureNotification, Round,
BatchDigest, Certificate, CertificateDigest, ConsensusStore, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, Metadata, PayloadAvailabilityRequest, PayloadAvailabilityResponse,
PrimaryMessage, PrimaryToPrimary, PrimaryToPrimaryServer, ReconfigureNotification, Round,
};

struct FetchCertificateProxy {
Expand Down Expand Up @@ -141,6 +143,20 @@ fn verify_certificates_not_in_store(
.is_none());
}

// Unsed below to construct malformed Headers
// Note: this should always mimic the Header struct, only changing the visibility of the id field to public
#[allow(dead_code)]
struct BadHeader {
pub author: PublicKey,
pub round: Round,
pub epoch: Epoch,
pub payload: IndexMap<BatchDigest, WorkerId>,
pub parents: BTreeSet<CertificateDigest>,
pub id: OnceCell<HeaderDigest>,
pub signature: Signature,
pub metadata: Metadata,
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn fetch_certificates_basic() {
let fixture = CommitteeFixture::builder().randomize_ports(true).build();
Expand Down Expand Up @@ -427,7 +443,14 @@ async fn fetch_certificates_basic() {
certs.push(cert);
// Add cert with incorrect digest.
let mut cert = certificates[num_written].clone();
cert.header.id = Default::default();
// This is a bit tedious to craft
let cert_header = unsafe { std::mem::transmute::<Header, BadHeader>(cert.header) };
let wrong_header = BadHeader {
id: OnceCell::with_value(HeaderDigest::default()),
..cert_header
};
let wolf_header = unsafe { std::mem::transmute::<BadHeader, Header>(wrong_header) };
cert.header = wolf_header;
certs.push(cert);
// Add cert without all parents in storage.
certs.push(certificates[num_written + 1].clone());
Expand Down
8 changes: 4 additions & 4 deletions narwhal/primary/src/tests/core_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn process_header() {
}

// Ensure the header is correctly stored.
let stored = header_store.read(header.id).await.unwrap();
let stored = header_store.read(header.id()).await.unwrap();
assert_eq!(stored, Some(header.clone()));

let mut m = HashMap::new();
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn process_header_missing_parent() {
.build(primary.keypair())
.unwrap();

let id = header.id;
let id = header.id();
tx_primary_messages
.send(PrimaryMessage::Header(header))
.await
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn process_header_missing_payload() {
.build(author.keypair())
.unwrap();

let id = header.id;
let id = header.id();
tx_primary_messages
.send(PrimaryMessage::Header(header))
.await
Expand Down Expand Up @@ -1204,6 +1204,6 @@ async fn reconfigure_core() {
}

// Ensure the header is correctly stored.
let stored = header_store.read(header.id).await.unwrap();
let stored = header_store.read(header.id()).await.unwrap();
assert_eq!(stored, Some(header));
}
10 changes: 5 additions & 5 deletions narwhal/primary/tests/integration_tests_validator_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ async fn test_get_collections() {
// Write the header
store
.header_store
.async_write(header.clone().id, header.clone())
.async_write(header.clone().id(), header.clone())
.await;

header_ids.push(header.clone().id);
header_ids.push(header.clone().id());

// Write the batches to payload store
store
Expand Down Expand Up @@ -277,10 +277,10 @@ async fn test_remove_collections() {
// Write the header
store
.header_store
.async_write(header.clone().id, header.clone())
.async_write(header.clone().id(), header.clone())
.await;

header_ids.push(header.clone().id);
header_ids.push(header.clone().id());

// Write the batches to payload store
store
Expand Down Expand Up @@ -1111,7 +1111,7 @@ async fn fixture_certificate(

// Write the header
header_store
.async_write(header.clone().id, header.clone())
.async_write(header.clone().id(), header.clone())
.await;

// Write the batches to payload store
Expand Down
1 change: 1 addition & 0 deletions narwhal/test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ telemetry-subscribers.workspace = true

anemo.workspace = true
tower = { version = "0.4.13", features = ["full"] }
once_cell = "1.16.0"
Loading

0 comments on commit 9466260

Please sign in to comment.