Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 111 additions & 33 deletions crates/starfish/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
cmp::max,
collections::{BTreeMap, BTreeSet, VecDeque},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -117,11 +118,19 @@ pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
/// For each peer, the set of authorities whose block headers the local node
/// considered useful when receiving a block bundle from that peer.
/// Keyed by the peer’s AuthorityIndex.
useful_authorities_from_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>,
useful_headers_authors_from_peer:
Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>,
/// For each peer, the set of local authorities that the peer reported as
/// useful to them (communicated inside their block bundles).
/// Keyed by the peer’s AuthorityIndex.
useful_authorities_to_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>,
useful_headers_authors_to_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>,
/// For each peer, stores the latest round in which a received block bundle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here also rename the header-related fields to be in line with the shard ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did renaming

/// from any peer included a useful shard originating from a block
/// created by authority `i`.
last_round_with_useful_shards_by_block_author: Arc<RwLock<Vec<Round>>>,
/// For each peer `i`, stores a set of authority indices representing the
/// authors of blocks whose shards were reported as useful by peer `i`.
useful_shards_authors_to_peer: Arc<RwLock<Vec<BTreeSet<AuthorityIndex>>>>,
}

impl<C: CoreThreadDispatcher> AuthorityService<C> {
Expand All @@ -141,6 +150,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
context.clone(),
core_dispatcher.clone(),
));
let committee_size = context.committee.size();

Self {
context,
Expand All @@ -154,9 +164,17 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
dag_state,
store,
received_block_headers: FilterForHeaders::new(),
useful_authorities_from_peer: Arc::new(RwLock::new(BTreeMap::new())),
useful_authorities_to_peer: Arc::new(RwLock::new(BTreeMap::new())),
useful_headers_authors_from_peer: Arc::new(RwLock::new(BTreeMap::new())),
useful_headers_authors_to_peer: Arc::new(RwLock::new(BTreeMap::new())),
transaction_message_sender,
last_round_with_useful_shards_by_block_author: Arc::new(RwLock::new(vec![
GENESIS_ROUND;
committee_size
])),
useful_shards_authors_to_peer: Arc::new(RwLock::new(vec![
BTreeSet::new();
committee_size
])),
}
}
}
Expand All @@ -176,11 +194,16 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
SerializedBlockBundleParts::try_from(serialized_block_bundle)?;

// Cache authorities this peer finds useful for cordial
let useful_authorities_to_peer = serialized_block_bundle_parts.useful_authorities();
let useful_authorities_to_peer = serialized_block_bundle_parts.useful_headers_authors();
{
let mut guard = self.useful_authorities_to_peer.write();
let mut guard = self.useful_headers_authors_to_peer.write();
guard.insert(peer, useful_authorities_to_peer);
}
let useful_shards_authors_to_peer = serialized_block_bundle_parts.useful_shards_authors();
{
let mut guard = self.useful_shards_authors_to_peer.write();
guard[peer] = useful_shards_authors_to_peer;
}

// 1. Create a verified block and make some preliminary checks
let SerializedHeaderAndTransactions {
Expand Down Expand Up @@ -569,6 +592,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.map_err(|_| ConsensusError::Shutdown)?;

// 11. Add the block to dag, add its missing ancestors to the set
let block_round = verified_block.round();
let (missing_block_ancestors, missing_block_committed_transactions) = self
.core_dispatcher
.add_blocks(vec![verified_block])
Expand Down Expand Up @@ -597,21 +621,40 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.await
.map_err(|_| ConsensusError::Shutdown)?;

// 13. update `useful_authorities_from_peer`
// 13. update `useful_headers_authors_from_peer`
// create a set of authority indexes from the `additional_block_headers`
// and `missing_ancestors`
let useful_authorities = additional_block_headers
.iter()
.map(|block_header| block_header.author())
.chain(missing_ancestors.iter().map(|block_ref| block_ref.author))
.collect::<BTreeSet<_>>();
{
let useful_headers_authors = additional_block_headers
.iter()
.map(|block_header| block_header.author())
.chain(missing_ancestors.iter().map(|block_ref| block_ref.author))
.collect::<BTreeSet<_>>();

let mut useful_authorities_from_peer_write_guard =
self.useful_authorities_from_peer.write();
useful_authorities_from_peer_write_guard.insert(peer, useful_authorities);
self.useful_headers_authors_from_peer.write();
useful_authorities_from_peer_write_guard.insert(peer, useful_headers_authors);
}
// 14. Update `last_round_with_useful_shards_by_block_author`:
// For each validator whose block is in `additional_block_headers`,
// set their entry to the maximum of its current value and the round of the
// received block.
{
let useful_shard_authors = additional_block_headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable is called useful_shard_authors but we iterate over additional_block_headers? is this correct?

Also, add a comment describing what this block of code does, or update comment for step 13. above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these headers went through filters and will now be added to dag_state without transaction data, so we request their shards.
Added comment

.iter()
.map(|block_header| block_header.author())
.collect::<Vec<_>>();
let mut last_round_with_useful_shards_from_peer_write_guard =
self.last_round_with_useful_shards_by_block_author.write();
for author in useful_shard_authors {
last_round_with_useful_shards_from_peer_write_guard[author] = max(
last_round_with_useful_shards_from_peer_write_guard[author],
block_round,
);
}
}

// 14. schedule the fetching of missing ancestors (if any) from this peer
// 15. schedule the fetching of missing ancestors (if any) from this peer
if !missing_ancestors.is_empty() {
if let Err(err) = self
.synchronizer
Expand All @@ -622,7 +665,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
}
}

// 15. schedule the fetching of missing committed transactions (if any)
// 16. schedule the fetching of missing committed transactions (if any)
if !missing_committed_txns.is_empty() {
if let Err(err) = self
.transactions_synchronizer
Expand Down Expand Up @@ -673,8 +716,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
// new blocks.
Ok(Box::pin(missed_blocks.chain({
let dag_state = Arc::clone(&self.dag_state);
let useful_authorities_from_peer = Arc::clone(&self.useful_authorities_from_peer);
let useful_authorities_to_peer = Arc::clone(&self.useful_authorities_to_peer);
let useful_headers_authors_from_peer =
Arc::clone(&self.useful_headers_authors_from_peer);
let useful_headers_authors_to_peer = Arc::clone(&self.useful_headers_authors_to_peer);
let last_round_with_useful_shards_by_block_author =
Arc::clone(&self.last_round_with_useful_shards_by_block_author);
let useful_shards_authors_to_peer = Arc::clone(&self.useful_shards_authors_to_peer);
let committee = self
.context
.committee
Expand All @@ -683,37 +730,60 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.collect::<BTreeSet<_>>();

broadcasted_blocks.filter_map(move |block| {
let useful_authorities_to_peer_guard = useful_authorities_to_peer.read();
let useful_authorities_to_peer_read = useful_authorities_to_peer_guard
let useful_headers_authors_to_peer_guard = useful_headers_authors_to_peer.read();
let useful_headers_authors_to_peer_read = useful_headers_authors_to_peer_guard
.get(&peer)
.map(|authorities| authorities.iter().cloned().collect::<BTreeSet<_>>());
drop(useful_authorities_to_peer_guard);
let useful_authorities_from_peer_guard = useful_authorities_from_peer.read();
let useful_authorities_from_peer_read =
match useful_authorities_from_peer_guard.get(&peer) {
drop(useful_headers_authors_to_peer_guard);

let useful_shards_authors_to_peer_guard = useful_shards_authors_to_peer.read();
let useful_shards_authors_to_peer_read =
useful_shards_authors_to_peer_guard[peer].clone();
drop(useful_shards_authors_to_peer_guard);

let useful_headers_authors_from_peer_guard =
useful_headers_authors_from_peer.read();
let useful_headers_authors_from_peer_read =
match useful_headers_authors_from_peer_guard.get(&peer) {
None => committee.clone(),
Some(useful_authorities) => useful_authorities.clone(),
};
drop(useful_authorities_from_peer_guard);
drop(useful_headers_authors_from_peer_guard);

let last_round_with_useful_shards_by_block_author_guard =
last_round_with_useful_shards_by_block_author.read();
let last_round_with_useful_shards_by_block_author_read =
last_round_with_useful_shards_by_block_author_guard.clone();
drop(last_round_with_useful_shards_by_block_author_guard);

let mut dag_state_guard = dag_state.write();
let block_headers = match useful_authorities_to_peer_read {
let block_headers = match useful_headers_authors_to_peer_read {
None => dag_state_guard.take_unknown_headers_for_authority(peer, block.round()),
Some(authorities) => dag_state_guard.take_useful_headers_for_authority(
peer,
block.round(),
authorities,
),
};
let serialized_shards =
dag_state_guard.take_unknown_shards_for_authority(peer, block.round());

let serialized_shards = dag_state_guard.take_useful_shards_for_authority(
peer,
block.round(),
useful_shards_authors_to_peer_read,
);
drop(dag_state_guard);

let useful_shards_authors = DagState::get_useful_shards_authors(
last_round_with_useful_shards_by_block_author_read,
block.round(),
);

let block_bundle = BlockBundle {
verified_block: block,
verified_headers: block_headers,
serialized_shards,
useful_authorities: useful_authorities_from_peer_read,
useful_headers_authors: useful_headers_authors_from_peer_read,
useful_shards_authors,
};
async move {
match SerializedBlockBundle::try_from(block_bundle) {
Expand Down Expand Up @@ -1686,7 +1756,8 @@ mod tests {
verified_block: input_block.clone(),
verified_headers: headers.clone(),
serialized_shards: vec![],
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(),
useful_headers_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
};
let serialized_block_bundle_with_big_round = SerializedBlockBundle::try_from(
SerializedBlockBundleParts::try_from(block_bundle_with_big_rounds).unwrap(),
Expand Down Expand Up @@ -1723,7 +1794,8 @@ mod tests {
verified_block: input_block.clone(),
verified_headers: headers.clone(),
serialized_shards: vec![],
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(),
useful_headers_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
};
let serialized_block_bundle = SerializedBlockBundle::try_from(
SerializedBlockBundleParts::try_from(block_bundle).unwrap(),
Expand Down Expand Up @@ -2218,7 +2290,10 @@ mod tests {
verified_block: block,
verified_headers: headers,
serialized_shards: vec![],
useful_authorities: (0u8..(context.committee.size() as u8))
useful_headers_authors: (0u8..(context.committee.size() as u8))
.map(Into::into)
.collect(),
useful_shards_authors: (0u8..(context.committee.size() as u8))
.map(Into::into)
.collect(),
};
Expand Down Expand Up @@ -2361,7 +2436,10 @@ mod tests {
verified_block: block,
verified_headers: vec![],
serialized_shards: vec![],
useful_authorities: (0u8..(context.committee.size() as u8))
useful_headers_authors: (0u8..(context.committee.size() as u8))
.map(Into::into)
.collect(),
useful_shards_authors: (0u8..(context.committee.size() as u8))
.map(Into::into)
.collect(),
};
Expand Down
2 changes: 1 addition & 1 deletion crates/starfish/core/src/block_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl BlockManager {
// for this accepted header we already have a block, so we add it to dag_state
self.dag_state
.write()
.add_transactions(block.verified_transactions, "block streaming");
.add_transactions(block.verified_transactions, "Block streaming");
}
}

Expand Down
82 changes: 61 additions & 21 deletions crates/starfish/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ use crate::{
threshold_clock::ThresholdClock,
};

/// If a shard from a block created by authority v1 is useful to authority v2 at
/// round r, then shards from v1 will be sent to v2 up to round r +
/// MAX_ROUND_GAP_FOR_USEFUL_SHARDS.
const MAX_ROUND_GAP_FOR_USEFUL_SHARDS: usize = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe what this constant controls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/// DagState provides the API to write and read accepted blocks from the DAG.
/// Only uncommitted and last committed blocks are cached in memory.
/// The rest of blocks are stored on disk.
Expand Down Expand Up @@ -1341,6 +1346,36 @@ impl DagState {
to_take
}

fn take_block_refs_of_useful_shards_for_authority(
&mut self,
authority_index: AuthorityIndex,
round_upper_bound_exclusive: Round,
useful_authorities: BTreeSet<AuthorityIndex>,
) -> Vec<BlockRef> {
let set = &mut self.shards_not_known_by_authority[authority_index.value()];

// Collect candidate block_refs we want to take out
let mut to_take = Vec::new();

for block_ref in set.iter() {
if to_take.len() >= self.context.parameters.max_shards_per_bundle
|| block_ref.round >= round_upper_bound_exclusive
{
break;
}
if useful_authorities.contains(&block_ref.author) {
to_take.push(*block_ref);
}
}

// Remove the references from the set and update cordial knowledge
for block_ref in &to_take {
set.remove(block_ref);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make sure: if the blocks are not removed from the set here then they will be evicted at some later point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, they are removed in evict_cordial_knowledge function

}

to_take
}

/// Retrieves up to `MAX_HEADERS_PER_BUNDLE` previously unknown block
/// headers for the given authority, restricted to rounds strictly below
/// `round_upper_bound_exclusive`.
Expand Down Expand Up @@ -1389,34 +1424,39 @@ impl DagState {
.collect()
}

pub(crate) fn take_unknown_shards_for_authority(
pub(crate) fn take_useful_shards_for_authority(
&mut self,
authority_index: AuthorityIndex,
round_upper_bound_exclusive: Round,
useful_authorities: BTreeSet<AuthorityIndex>,
) -> Vec<Bytes> {
let mut set = mem::take(&mut self.shards_not_known_by_authority[authority_index.value()]);

let split_point = {
let round_bound = BlockRef::new(
round_upper_bound_exclusive,
AuthorityIndex::MIN,
BlockHeaderDigest::MIN,
);
let nth_element = set
.iter()
.nth(self.context.parameters.max_shards_per_bundle)
.map_or(round_bound, |x| *x);
min(nth_element, round_bound)
};
let block_refs = self.take_block_refs_of_useful_shards_for_authority(
authority_index,
round_upper_bound_exclusive,
useful_authorities,
);
block_refs
.iter()
.map(|block_ref| {
self.recent_shards
.get(block_ref)
.expect("Shard should be in DagState")
.clone()
})
.collect::<Vec<_>>()
}

self.shards_not_known_by_authority[authority_index.value()] = set.split_off(&split_point);
let mut shards: Vec<Bytes> = vec![];
for block_ref in set.into_iter() {
if let Some(shard) = self.recent_shards.get(&block_ref) {
shards.push(shard.clone());
pub(crate) fn get_useful_shards_authors(
last_useful_round: Vec<Round>,
block_round: Round,
) -> BTreeSet<AuthorityIndex> {
let mut useful_shard_authors = BTreeSet::new();
for (i, round) in last_useful_round.iter().enumerate() {
if block_round - round <= MAX_ROUND_GAP_FOR_USEFUL_SHARDS as u32 {
useful_shard_authors.insert(AuthorityIndex::from(i as u8));
}
}
shards
useful_shard_authors
}
pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
let mut votes = Vec::new();
Expand Down
Loading
Loading