-
Notifications
You must be signed in to change notification settings - Fork 51
feat(starfish): send only useful shards #8801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use std::{ | ||
cmp::max, | ||
collections::{BTreeMap, BTreeSet, VecDeque}, | ||
pin::Pin, | ||
sync::Arc, | ||
|
@@ -117,11 +118,19 @@ pub(crate) struct AuthorityService<C: CoreThreadDispatcher> { | |
/// For each peer, the set of authorities whose block headers the local node | ||
/// considered useful when receiving a block bundle from that peer. | ||
/// Keyed by the peer’s AuthorityIndex. | ||
useful_authorities_from_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>, | ||
useful_headers_authors_from_peer: | ||
Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>, | ||
/// For each peer, the set of local authorities that the peer reported as | ||
/// useful to them (communicated inside their block bundles). | ||
/// Keyed by the peer’s AuthorityIndex. | ||
useful_authorities_to_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>, | ||
useful_headers_authors_to_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>, | ||
/// For each peer, stores the latest round in which a received block bundle | ||
/// from any peer included a useful shard originating from a block | ||
/// created by authority `i`. | ||
last_round_with_useful_shards_by_block_author: Arc<RwLock<Vec<Round>>>, | ||
/// For each peer `i`, stores a set of authority indices representing the | ||
/// authors of blocks whose shards were reported as useful by peer `i`. | ||
useful_shards_authors_to_peer: Arc<RwLock<Vec<BTreeSet<AuthorityIndex>>>>, | ||
} | ||
|
||
impl<C: CoreThreadDispatcher> AuthorityService<C> { | ||
|
@@ -141,6 +150,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> { | |
context.clone(), | ||
core_dispatcher.clone(), | ||
)); | ||
let committee_size = context.committee.size(); | ||
|
||
Self { | ||
context, | ||
|
@@ -154,9 +164,17 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> { | |
dag_state, | ||
store, | ||
received_block_headers: FilterForHeaders::new(), | ||
useful_authorities_from_peer: Arc::new(RwLock::new(BTreeMap::new())), | ||
useful_authorities_to_peer: Arc::new(RwLock::new(BTreeMap::new())), | ||
useful_headers_authors_from_peer: Arc::new(RwLock::new(BTreeMap::new())), | ||
useful_headers_authors_to_peer: Arc::new(RwLock::new(BTreeMap::new())), | ||
transaction_message_sender, | ||
last_round_with_useful_shards_by_block_author: Arc::new(RwLock::new(vec![ | ||
GENESIS_ROUND; | ||
committee_size | ||
])), | ||
useful_shards_authors_to_peer: Arc::new(RwLock::new(vec![ | ||
BTreeSet::new(); | ||
committee_size | ||
])), | ||
} | ||
} | ||
} | ||
|
@@ -176,11 +194,16 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> { | |
SerializedBlockBundleParts::try_from(serialized_block_bundle)?; | ||
|
||
// Cache authorities this peer finds useful for cordial | ||
let useful_authorities_to_peer = serialized_block_bundle_parts.useful_authorities(); | ||
let useful_authorities_to_peer = serialized_block_bundle_parts.useful_headers_authors(); | ||
{ | ||
let mut guard = self.useful_authorities_to_peer.write(); | ||
let mut guard = self.useful_headers_authors_to_peer.write(); | ||
guard.insert(peer, useful_authorities_to_peer); | ||
} | ||
let useful_shards_authors_to_peer = serialized_block_bundle_parts.useful_shards_authors(); | ||
{ | ||
let mut guard = self.useful_shards_authors_to_peer.write(); | ||
guard[peer] = useful_shards_authors_to_peer; | ||
} | ||
|
||
// 1. Create a verified block and make some preliminary checks | ||
let SerializedHeaderAndTransactions { | ||
|
@@ -569,6 +592,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> { | |
.map_err(|_| ConsensusError::Shutdown)?; | ||
|
||
// 11. Add the block to dag, add its missing ancestors to the set | ||
let block_round = verified_block.round(); | ||
let (missing_block_ancestors, missing_block_committed_transactions) = self | ||
.core_dispatcher | ||
.add_blocks(vec![verified_block]) | ||
|
@@ -597,21 +621,40 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> { | |
.await | ||
.map_err(|_| ConsensusError::Shutdown)?; | ||
|
||
// 13. update `useful_authorities_from_peer` | ||
// 13. update `useful_headers_authors_from_peer` | ||
// create a set of authority indexes from the `additional_block_headers` | ||
// and `missing_ancestors` | ||
let useful_authorities = additional_block_headers | ||
.iter() | ||
.map(|block_header| block_header.author()) | ||
.chain(missing_ancestors.iter().map(|block_ref| block_ref.author)) | ||
.collect::<BTreeSet<_>>(); | ||
{ | ||
let useful_headers_authors = additional_block_headers | ||
.iter() | ||
.map(|block_header| block_header.author()) | ||
.chain(missing_ancestors.iter().map(|block_ref| block_ref.author)) | ||
.collect::<BTreeSet<_>>(); | ||
|
||
let mut useful_authorities_from_peer_write_guard = | ||
self.useful_authorities_from_peer.write(); | ||
useful_authorities_from_peer_write_guard.insert(peer, useful_authorities); | ||
self.useful_headers_authors_from_peer.write(); | ||
useful_authorities_from_peer_write_guard.insert(peer, useful_headers_authors); | ||
} | ||
// 14. Update `last_round_with_useful_shards_by_block_author`: | ||
// For each validator whose block is in `additional_block_headers`, | ||
// set their entry to the maximum of its current value and the round of the | ||
// received block. | ||
{ | ||
let useful_shard_authors = additional_block_headers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. variable is called Also, add a comment describing what this block of code does, or update comment for step 13. above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, these headers went through filters and will now be added to dag_state without transaction data, so we request their shards. |
||
.iter() | ||
.map(|block_header| block_header.author()) | ||
.collect::<Vec<_>>(); | ||
let mut last_round_with_useful_shards_from_peer_write_guard = | ||
self.last_round_with_useful_shards_by_block_author.write(); | ||
for author in useful_shard_authors { | ||
last_round_with_useful_shards_from_peer_write_guard[author] = max( | ||
last_round_with_useful_shards_from_peer_write_guard[author], | ||
block_round, | ||
); | ||
} | ||
} | ||
|
||
// 14. schedule the fetching of missing ancestors (if any) from this peer | ||
// 15. schedule the fetching of missing ancestors (if any) from this peer | ||
if !missing_ancestors.is_empty() { | ||
if let Err(err) = self | ||
.synchronizer | ||
|
@@ -622,7 +665,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> { | |
} | ||
} | ||
|
||
// 15. schedule the fetching of missing committed transactions (if any) | ||
// 16. schedule the fetching of missing committed transactions (if any) | ||
if !missing_committed_txns.is_empty() { | ||
if let Err(err) = self | ||
.transactions_synchronizer | ||
|
@@ -673,8 +716,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> { | |
// new blocks. | ||
Ok(Box::pin(missed_blocks.chain({ | ||
let dag_state = Arc::clone(&self.dag_state); | ||
let useful_authorities_from_peer = Arc::clone(&self.useful_authorities_from_peer); | ||
let useful_authorities_to_peer = Arc::clone(&self.useful_authorities_to_peer); | ||
let useful_headers_authors_from_peer = | ||
Arc::clone(&self.useful_headers_authors_from_peer); | ||
let useful_headers_authors_to_peer = Arc::clone(&self.useful_headers_authors_to_peer); | ||
let last_round_with_useful_shards_by_block_author = | ||
Arc::clone(&self.last_round_with_useful_shards_by_block_author); | ||
let useful_shards_authors_to_peer = Arc::clone(&self.useful_shards_authors_to_peer); | ||
let committee = self | ||
.context | ||
.committee | ||
|
@@ -683,37 +730,60 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> { | |
.collect::<BTreeSet<_>>(); | ||
|
||
broadcasted_blocks.filter_map(move |block| { | ||
let useful_authorities_to_peer_guard = useful_authorities_to_peer.read(); | ||
let useful_authorities_to_peer_read = useful_authorities_to_peer_guard | ||
let useful_headers_authors_to_peer_guard = useful_headers_authors_to_peer.read(); | ||
let useful_headers_authors_to_peer_read = useful_headers_authors_to_peer_guard | ||
.get(&peer) | ||
.map(|authorities| authorities.iter().cloned().collect::<BTreeSet<_>>()); | ||
drop(useful_authorities_to_peer_guard); | ||
let useful_authorities_from_peer_guard = useful_authorities_from_peer.read(); | ||
let useful_authorities_from_peer_read = | ||
match useful_authorities_from_peer_guard.get(&peer) { | ||
drop(useful_headers_authors_to_peer_guard); | ||
|
||
let useful_shards_authors_to_peer_guard = useful_shards_authors_to_peer.read(); | ||
let useful_shards_authors_to_peer_read = | ||
useful_shards_authors_to_peer_guard[peer].clone(); | ||
drop(useful_shards_authors_to_peer_guard); | ||
|
||
let useful_headers_authors_from_peer_guard = | ||
useful_headers_authors_from_peer.read(); | ||
let useful_headers_authors_from_peer_read = | ||
match useful_headers_authors_from_peer_guard.get(&peer) { | ||
None => committee.clone(), | ||
Some(useful_authorities) => useful_authorities.clone(), | ||
}; | ||
drop(useful_authorities_from_peer_guard); | ||
drop(useful_headers_authors_from_peer_guard); | ||
|
||
let last_round_with_useful_shards_by_block_author_guard = | ||
last_round_with_useful_shards_by_block_author.read(); | ||
let last_round_with_useful_shards_by_block_author_read = | ||
last_round_with_useful_shards_by_block_author_guard.clone(); | ||
drop(last_round_with_useful_shards_by_block_author_guard); | ||
|
||
let mut dag_state_guard = dag_state.write(); | ||
let block_headers = match useful_authorities_to_peer_read { | ||
let block_headers = match useful_headers_authors_to_peer_read { | ||
None => dag_state_guard.take_unknown_headers_for_authority(peer, block.round()), | ||
Some(authorities) => dag_state_guard.take_useful_headers_for_authority( | ||
peer, | ||
block.round(), | ||
authorities, | ||
), | ||
}; | ||
let serialized_shards = | ||
dag_state_guard.take_unknown_shards_for_authority(peer, block.round()); | ||
|
||
let serialized_shards = dag_state_guard.take_useful_shards_for_authority( | ||
peer, | ||
block.round(), | ||
useful_shards_authors_to_peer_read, | ||
); | ||
drop(dag_state_guard); | ||
|
||
let useful_shards_authors = DagState::get_useful_shards_authors( | ||
last_round_with_useful_shards_by_block_author_read, | ||
block.round(), | ||
); | ||
|
||
let block_bundle = BlockBundle { | ||
verified_block: block, | ||
verified_headers: block_headers, | ||
serialized_shards, | ||
useful_authorities: useful_authorities_from_peer_read, | ||
useful_headers_authors: useful_headers_authors_from_peer_read, | ||
useful_shards_authors, | ||
}; | ||
async move { | ||
match SerializedBlockBundle::try_from(block_bundle) { | ||
|
@@ -1686,7 +1756,8 @@ mod tests { | |
verified_block: input_block.clone(), | ||
verified_headers: headers.clone(), | ||
serialized_shards: vec![], | ||
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(), | ||
useful_headers_authors: (0u8..(committee_size as u8)).map(Into::into).collect(), | ||
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(), | ||
}; | ||
let serialized_block_bundle_with_big_round = SerializedBlockBundle::try_from( | ||
SerializedBlockBundleParts::try_from(block_bundle_with_big_rounds).unwrap(), | ||
|
@@ -1723,7 +1794,8 @@ mod tests { | |
verified_block: input_block.clone(), | ||
verified_headers: headers.clone(), | ||
serialized_shards: vec![], | ||
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(), | ||
useful_headers_authors: (0u8..(committee_size as u8)).map(Into::into).collect(), | ||
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(), | ||
}; | ||
let serialized_block_bundle = SerializedBlockBundle::try_from( | ||
SerializedBlockBundleParts::try_from(block_bundle).unwrap(), | ||
|
@@ -2218,7 +2290,10 @@ mod tests { | |
verified_block: block, | ||
verified_headers: headers, | ||
serialized_shards: vec![], | ||
useful_authorities: (0u8..(context.committee.size() as u8)) | ||
useful_headers_authors: (0u8..(context.committee.size() as u8)) | ||
.map(Into::into) | ||
.collect(), | ||
useful_shards_authors: (0u8..(context.committee.size() as u8)) | ||
.map(Into::into) | ||
.collect(), | ||
}; | ||
|
@@ -2361,7 +2436,10 @@ mod tests { | |
verified_block: block, | ||
verified_headers: vec![], | ||
serialized_shards: vec![], | ||
useful_authorities: (0u8..(context.committee.size() as u8)) | ||
useful_headers_authors: (0u8..(context.committee.size() as u8)) | ||
.map(Into::into) | ||
.collect(), | ||
useful_shards_authors: (0u8..(context.committee.size() as u8)) | ||
.map(Into::into) | ||
.collect(), | ||
}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,11 @@ use crate::{ | |
threshold_clock::ThresholdClock, | ||
}; | ||
|
||
/// If a shard from a block created by authority v1 is useful to authority v2 at | ||
/// round r, then shards from v1 will be sent to v2 up to round r + | ||
/// MAX_ROUND_GAP_FOR_USEFUL_SHARDS. | ||
const MAX_ROUND_GAP_FOR_USEFUL_SHARDS: usize = 5; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please describe what this constant controls. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
/// DagState provides the API to write and read accepted blocks from the DAG. | ||
/// Only uncommitted and last committed blocks are cached in memory. | ||
/// The rest of blocks are stored on disk. | ||
|
@@ -1341,6 +1346,36 @@ impl DagState { | |
to_take | ||
} | ||
|
||
fn take_block_refs_of_useful_shards_for_authority( | ||
&mut self, | ||
authority_index: AuthorityIndex, | ||
round_upper_bound_exclusive: Round, | ||
useful_authorities: BTreeSet<AuthorityIndex>, | ||
) -> Vec<BlockRef> { | ||
let set = &mut self.shards_not_known_by_authority[authority_index.value()]; | ||
|
||
// Collect candidate block_refs we want to take out | ||
let mut to_take = Vec::new(); | ||
|
||
for block_ref in set.iter() { | ||
if to_take.len() >= self.context.parameters.max_shards_per_bundle | ||
|| block_ref.round >= round_upper_bound_exclusive | ||
{ | ||
break; | ||
} | ||
if useful_authorities.contains(&block_ref.author) { | ||
to_take.push(*block_ref); | ||
} | ||
} | ||
|
||
// Remove the references from the set and update cordial knowledge | ||
for block_ref in &to_take { | ||
set.remove(block_ref); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to make sure: if the blocks are not removed from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, they are removed in evict_cordial_knowledge function |
||
} | ||
|
||
to_take | ||
} | ||
|
||
/// Retrieves up to `MAX_HEADERS_PER_BUNDLE` previously unknown block | ||
/// headers for the given authority, restricted to rounds strictly below | ||
/// `round_upper_bound_exclusive`. | ||
|
@@ -1389,34 +1424,39 @@ impl DagState { | |
.collect() | ||
} | ||
|
||
pub(crate) fn take_unknown_shards_for_authority( | ||
pub(crate) fn take_useful_shards_for_authority( | ||
&mut self, | ||
authority_index: AuthorityIndex, | ||
round_upper_bound_exclusive: Round, | ||
useful_authorities: BTreeSet<AuthorityIndex>, | ||
) -> Vec<Bytes> { | ||
let mut set = mem::take(&mut self.shards_not_known_by_authority[authority_index.value()]); | ||
|
||
let split_point = { | ||
let round_bound = BlockRef::new( | ||
round_upper_bound_exclusive, | ||
AuthorityIndex::MIN, | ||
BlockHeaderDigest::MIN, | ||
); | ||
let nth_element = set | ||
.iter() | ||
.nth(self.context.parameters.max_shards_per_bundle) | ||
.map_or(round_bound, |x| *x); | ||
min(nth_element, round_bound) | ||
}; | ||
let block_refs = self.take_block_refs_of_useful_shards_for_authority( | ||
authority_index, | ||
round_upper_bound_exclusive, | ||
useful_authorities, | ||
); | ||
block_refs | ||
.iter() | ||
.map(|block_ref| { | ||
self.recent_shards | ||
.get(block_ref) | ||
.expect("Shard should be in DagState") | ||
.clone() | ||
}) | ||
.collect::<Vec<_>>() | ||
} | ||
|
||
self.shards_not_known_by_authority[authority_index.value()] = set.split_off(&split_point); | ||
let mut shards: Vec<Bytes> = vec![]; | ||
for block_ref in set.into_iter() { | ||
if let Some(shard) = self.recent_shards.get(&block_ref) { | ||
shards.push(shard.clone()); | ||
pub(crate) fn get_useful_shards_authors( | ||
last_useful_round: Vec<Round>, | ||
block_round: Round, | ||
) -> BTreeSet<AuthorityIndex> { | ||
let mut useful_shard_authors = BTreeSet::new(); | ||
for (i, round) in last_useful_round.iter().enumerate() { | ||
if block_round - round <= MAX_ROUND_GAP_FOR_USEFUL_SHARDS as u32 { | ||
useful_shard_authors.insert(AuthorityIndex::from(i as u8)); | ||
} | ||
} | ||
shards | ||
useful_shard_authors | ||
} | ||
pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> { | ||
let mut votes = Vec::new(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here also rename the header-related fields to be in line with the shard ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did renaming