Skip to content

Commit

Permalink
A few improvements to StakeAggregator (#7705)
Browse files Browse the repository at this point in the history
This PR adds a specialization for AuthSignInfo. This will allow the
stake aggregator reused much better in many other places. Specifically:
1. Added epoch check to make sure AuthSignInfo is from the same epoch as
committee
2. Check that authority is valid (by checking that stake is > 0)
3. By actively aggregating the signatures into a quorum signature and
return it.
  • Loading branch information
lxfind authored Jan 27, 2023
1 parent 3e407fa commit eaee14e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 29 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ impl AuthorityPerEpochStore {
write_batch.insert_batch(&self.tables.end_of_publish, [(authority, ())])?;
self.end_of_publish.try_lock()
.expect("No contention on Authority::end_of_publish as it is only accessed from consensus handler")
.insert(authority, ()).is_quorum_reached()
.insert_generic(authority, ()).is_quorum_reached()
} else {
// If we past the stage where we are accepting consensus certificates we also don't record end of publish messages
debug!("Ignoring end of publish message from validator {:?} as we already collected enough end of publish messages", authority.concise());
Expand Down
26 changes: 10 additions & 16 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,29 +895,23 @@ impl CheckpointSignatureAggregator {
);
return Err(());
}
match self.signatures.insert(author, signature) {
match self.signatures.insert(signature) {
InsertResult::RepeatingEntry { previous, new } => {
if previous != new {
warn!("Validator {:?} submitted two different signatures for checkpoint {}: {:?}, {:?}", author.concise(), self.summary.sequence_number, previous, new);
}
Err(())
}
InsertResult::QuorumReached(values) => {
let signatures = values.values().cloned().collect();
match AuthorityWeakQuorumSignInfo::new_from_auth_sign_infos(
signatures,
self.signatures.committee(),
) {
Ok(aggregated) => Ok(aggregated),
Err(err) => {
error!(
"Unexpected error when aggregating signatures for checkpoint {}: {:?}",
self.summary.sequence_number, err
);
Err(())
}
}
InsertResult::Failed { error } => {
warn!(
"Failed to aggregate new signature from validator {:?} for checkpoint {}: {:?}",
author.concise(),
self.summary.sequence_number,
error
);
Err(())
}
InsertResult::QuorumReached(cert) => Ok(cert),
InsertResult::NotEnoughVotes => Err(()),
}
}
Expand Down
73 changes: 61 additions & 12 deletions crates/sui-core/src/stake_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use sui_types::base_types::AuthorityName;
use sui_types::committee::{Committee, StakeUnit};
use sui_types::crypto::{AuthorityQuorumSignInfo, AuthoritySignInfo};
use sui_types::error::SuiError;

pub struct StakeAggregator<V, const STRENGTH: bool> {
data: HashMap<AuthorityName, V>,
stake: StakeUnit,
total_votes: StakeUnit,
committee: Committee,
}

impl<V: Clone, const STRENGTH: bool> StakeAggregator<V, STRENGTH> {
pub fn new(committee: Committee) -> Self {
Self {
data: Default::default(),
stake: Default::default(),
total_votes: Default::default(),
committee,
}
}
Expand All @@ -27,12 +29,15 @@ impl<V: Clone, const STRENGTH: bool> StakeAggregator<V, STRENGTH> {
) -> Self {
let mut this = Self::new(committee);
for (authority, v) in data {
this.insert(authority, v);
this.insert_generic(authority, v);
}
this
}

pub fn insert(&mut self, authority: AuthorityName, v: V) -> InsertResult<V> {
/// A generic version of inserting arbitrary type of V (e.g. void type).
/// If V is AuthoritySignInfo, the `insert` function should be used instead since it does extra
/// checks and aggregations in the end.
pub fn insert_generic(&mut self, authority: AuthorityName, v: V) -> InsertResult<V, ()> {
match self.data.entry(authority) {
Entry::Occupied(oc) => {
return InsertResult::RepeatingEntry {
Expand All @@ -44,11 +49,18 @@ impl<V: Clone, const STRENGTH: bool> StakeAggregator<V, STRENGTH> {
va.insert(v);
}
}
self.stake += self.committee.weight(&authority);
if self.stake >= self.committee.threshold::<STRENGTH>() {
InsertResult::QuorumReached(&self.data)
let votes = self.committee.weight(&authority);
if votes > 0 {
self.total_votes += votes;
if self.total_votes >= self.committee.threshold::<STRENGTH>() {
InsertResult::QuorumReached(())
} else {
InsertResult::NotEnoughVotes
}
} else {
InsertResult::NotEnoughVotes
InsertResult::Failed {
error: SuiError::InvalidAuthenticator,
}
}
}

Expand All @@ -61,14 +73,51 @@ impl<V: Clone, const STRENGTH: bool> StakeAggregator<V, STRENGTH> {
}
}

pub enum InsertResult<'a, V> {
QuorumReached(&'a HashMap<AuthorityName, V>),
impl<const STRENGTH: bool> StakeAggregator<AuthoritySignInfo, STRENGTH> {
/// Insert an authority signature. This is the primary way to use the aggregator and a few
/// dedicated checks are performed to make sure things work.
/// If quorum is reached, we return AuthorityQuorumSignInfo directly.
pub fn insert(
&mut self,
sig: AuthoritySignInfo,
) -> InsertResult<AuthoritySignInfo, AuthorityQuorumSignInfo<STRENGTH>> {
if self.committee.epoch != sig.epoch {
return InsertResult::Failed {
error: SuiError::WrongEpoch {
expected_epoch: self.committee.epoch,
actual_epoch: sig.epoch,
},
};
}
match self.insert_generic(sig.authority, sig) {
InsertResult::QuorumReached(_) => {
match AuthorityQuorumSignInfo::<STRENGTH>::new_from_auth_sign_infos(
self.data.values().cloned().collect(),
self.committee(),
) {
Ok(aggregated) => InsertResult::QuorumReached(aggregated),
Err(error) => InsertResult::Failed { error },
}
}
// The following is necessary to change the template type of InsertResult.
InsertResult::RepeatingEntry { previous, new } => {
InsertResult::RepeatingEntry { previous, new }
}
InsertResult::Failed { error } => InsertResult::Failed { error },
InsertResult::NotEnoughVotes => InsertResult::NotEnoughVotes,
}
}
}

pub enum InsertResult<V, CertT> {
QuorumReached(CertT),
RepeatingEntry { previous: V, new: V },
Failed { error: SuiError },
NotEnoughVotes,
}

impl<'a, V> InsertResult<'a, V> {
impl<V, CertT> InsertResult<V, CertT> {
pub fn is_quorum_reached(&self) -> bool {
matches!(self, InsertResult::QuorumReached(_))
matches!(self, Self::QuorumReached(..))
}
}

1 comment on commit eaee14e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Owned Transactions Benchmark Results

Benchmark Report:
+-------------+-----+--------+---------------+---------------+---------------+
| duration(s) | tps | error% | latency (min) | latency (p50) | latency (p99) |
+============================================================================+
| 60          | 100 | 0      | 12            | 58            | 102           |
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 7   | 16  |

Shared Transactions Benchmark Results

Benchmark Report:
+-------------+-----+--------+---------------+---------------+---------------+
| duration(s) | tps | error% | latency (min) | latency (p50) | latency (p99) |
+============================================================================+
| 60          | 98  | 0      | 80            | 479           | 879           |
Stress Performance Report:
+-----------+-----+-----+
| metric    | p50 | p99 |
+=======================+
| cpu usage | 8   | 15  |

Narwhal Benchmark Results

 SUMMARY:
-----------------------------------------
 + CONFIG:
 Faults: 0 node(s)
 Committee size: 4 node(s)
 Worker(s) per node: 1 worker(s)
 Collocate primary and workers: True
 Input rate: 50,000 tx/s
 Transaction size: 512 B
 Execution time: 57 s

 Header number of batches threshold: 32 digests
 Header maximum number of batches: 1,000 digests
 Max header delay: 2,000 ms
 GC depth: 50 round(s)
 Sync retry delay: 10,000 ms
 Sync retry nodes: 3 node(s)
 batch size: 500,000 B
 Max batch delay: 200 ms
 Max concurrent requests: 500,000 

 + RESULTS:
 Batch creation avg latency: 77 ms
 Header creation avg latency: 1,758 ms
 	Batch to header avg latency: 955 ms
 Header to certificate avg latency: 6 ms
 	Request vote outbound avg latency: 3 ms
 Certificate commit avg latency: 3,444 ms

 Consensus TPS: 49,438 tx/s
 Consensus BPS: 25,312,047 B/s
 Consensus latency: 3,551 ms

 End-to-end TPS: 47,743 tx/s
 End-to-end BPS: 24,444,383 B/s
 End-to-end latency: 4,547 ms
-----------------------------------------

Please sign in to comment.