Skip to content

Commit 68da5d0

Browse files
committed
track useful shards
1 parent 86b04d6 commit 68da5d0

File tree

3 files changed

+179
-22
lines changed

3 files changed

+179
-22
lines changed

crates/starfish/core/src/authority_service.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// SPDX-License-Identifier: Apache-2.0
44

55
use std::{
6+
cmp::max,
67
collections::{BTreeMap, BTreeSet, VecDeque},
78
pin::Pin,
89
sync::Arc,
@@ -122,6 +123,13 @@ pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
122123
/// useful to them (communicated inside their block bundles).
123124
/// Keyed by the peer’s AuthorityIndex.
124125
useful_authorities_to_peer: Arc<RwLock<BTreeMap<AuthorityIndex, BTreeSet<AuthorityIndex>>>>,
126+
/// For each peer `i`, stores a vector where each entry at index `j`
127+
/// represents the last round in which a received bundle from peer `i`
128+
/// contained a useful shard from a block created by authority `j`.
129+
last_round_with_useful_shards_from_peer: Arc<RwLock<Vec<Vec<Round>>>>,
130+
/// For each peer `i`, stores a set of authority indices representing the
131+
/// authors of blocks whose shards were reported as useful by peer `i`.
132+
peers_reporting_useful_shards: Arc<RwLock<Vec<BTreeSet<AuthorityIndex>>>>,
125133
}
126134

127135
impl<C: CoreThreadDispatcher> AuthorityService<C> {
@@ -141,6 +149,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
141149
context.clone(),
142150
core_dispatcher.clone(),
143151
));
152+
let committee_size = context.committee.size();
144153

145154
Self {
146155
context,
@@ -157,6 +166,17 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
157166
useful_authorities_from_peer: Arc::new(RwLock::new(BTreeMap::new())),
158167
useful_authorities_to_peer: Arc::new(RwLock::new(BTreeMap::new())),
159168
transaction_message_sender,
169+
last_round_with_useful_shards_from_peer: Arc::new(RwLock::new(vec![
170+
vec![
171+
GENESIS_ROUND;
172+
committee_size
173+
];
174+
committee_size
175+
])),
176+
peers_reporting_useful_shards: Arc::new(RwLock::new(vec![
177+
BTreeSet::new();
178+
committee_size
179+
])),
160180
}
161181
}
162182
}
@@ -181,6 +201,11 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
181201
let mut guard = self.useful_authorities_to_peer.write();
182202
guard.insert(peer, useful_authorities_to_peer);
183203
}
204+
let useful_shards_authors_to_peer = serialized_block_bundle_parts.useful_shards_authors();
205+
{
206+
let mut guard = self.peers_reporting_useful_shards.write();
207+
guard[peer] = useful_shards_authors_to_peer;
208+
}
184209

185210
// 1. Create a verified block and make some preliminary checks
186211
let SerializedHeaderAndTransactions {
@@ -556,6 +581,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
556581
.map_err(|_| ConsensusError::Shutdown)?;
557582

558583
// 11. Add the block to dag, add its missing ancestors to the set
584+
let block_round = verified_block.round();
559585
let (missing_block_ancestors, missing_block_committed_transactions) = self
560586
.core_dispatcher
561587
.add_blocks(vec![verified_block])
@@ -597,6 +623,20 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
597623
self.useful_authorities_from_peer.write();
598624
useful_authorities_from_peer_write_guard.insert(peer, useful_authorities);
599625
}
626+
{
627+
let useful_shard_authors = additional_block_headers
628+
.iter()
629+
.map(|block_header| block_header.author())
630+
.collect::<Vec<_>>();
631+
let mut last_round_with_useful_shards_from_peer_write_guard =
632+
self.last_round_with_useful_shards_from_peer.write();
633+
for author in useful_shard_authors {
634+
last_round_with_useful_shards_from_peer_write_guard[peer][author] = max(
635+
last_round_with_useful_shards_from_peer_write_guard[peer][author],
636+
block_round,
637+
);
638+
}
639+
}
600640

601641
// 14. schedule the fetching of missing ancestors (if any) from this peer
602642
if !missing_ancestors.is_empty() {
@@ -662,6 +702,9 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
662702
let dag_state = Arc::clone(&self.dag_state);
663703
let useful_authorities_from_peer = Arc::clone(&self.useful_authorities_from_peer);
664704
let useful_authorities_to_peer = Arc::clone(&self.useful_authorities_to_peer);
705+
let last_round_with_useful_shards_from_peer =
706+
Arc::clone(&self.last_round_with_useful_shards_from_peer);
707+
let peers_reporting_useful_shards = Arc::clone(&self.peers_reporting_useful_shards);
665708
let committee = self
666709
.context
667710
.committee
@@ -675,6 +718,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
675718
.get(&peer)
676719
.map(|authorities| authorities.iter().cloned().collect::<BTreeSet<_>>());
677720
drop(useful_authorities_to_peer_guard);
721+
722+
let peers_reporting_useful_shards_guard = peers_reporting_useful_shards.read();
723+
let peers_reporting_useful_shards_read =
724+
peers_reporting_useful_shards_guard[peer].clone();
725+
drop(peers_reporting_useful_shards_guard);
726+
678727
let useful_authorities_from_peer_guard = useful_authorities_from_peer.read();
679728
let useful_authorities_from_peer_read =
680729
match useful_authorities_from_peer_guard.get(&peer) {
@@ -683,6 +732,12 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
683732
};
684733
drop(useful_authorities_from_peer_guard);
685734

735+
let last_round_with_useful_shards_from_peer_guard =
736+
last_round_with_useful_shards_from_peer.read();
737+
let last_round_with_useful_shards_from_peer_read =
738+
last_round_with_useful_shards_from_peer_guard[peer].clone();
739+
drop(last_round_with_useful_shards_from_peer_guard);
740+
686741
let mut dag_state_guard = dag_state.write();
687742
let block_headers = match useful_authorities_to_peer_read {
688743
None => dag_state_guard.take_unknown_headers_for_authority(peer, block.round()),
@@ -692,15 +747,25 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
692747
authorities,
693748
),
694749
};
695-
let serialized_shards =
696-
dag_state_guard.take_unknown_shards_for_authority(peer, block.round());
750+
751+
let serialized_shards = dag_state_guard.take_useful_shards_for_authority(
752+
peer,
753+
block.round(),
754+
peers_reporting_useful_shards_read,
755+
);
697756
drop(dag_state_guard);
698757

758+
let useful_shards_authors = DagState::get_useful_shards_authors(
759+
last_round_with_useful_shards_from_peer_read,
760+
block.round(),
761+
);
762+
699763
let block_bundle = BlockBundle {
700764
verified_block: block,
701765
verified_headers: block_headers,
702766
serialized_shards,
703767
useful_authorities: useful_authorities_from_peer_read,
768+
useful_shards_authors,
704769
};
705770
async move {
706771
match SerializedBlockBundle::try_from(block_bundle) {
@@ -1672,6 +1737,7 @@ mod tests {
16721737
verified_headers: headers.clone(),
16731738
serialized_shards: vec![],
16741739
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(),
1740+
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
16751741
};
16761742
let serialized_big_block_bundle = SerializedBlockBundle::try_from(
16771743
SerializedBlockBundleParts::try_from(big_block_bundle).unwrap(),
@@ -1701,6 +1767,7 @@ mod tests {
17011767
verified_headers: headers.clone(),
17021768
serialized_shards: vec![],
17031769
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(),
1770+
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
17041771
};
17051772
let serialized_block_bundle_with_big_round = SerializedBlockBundle::try_from(
17061773
SerializedBlockBundleParts::try_from(block_bundle_with_big_rounds).unwrap(),
@@ -1738,6 +1805,7 @@ mod tests {
17381805
verified_headers: headers.clone(),
17391806
serialized_shards: vec![],
17401807
useful_authorities: (0u8..(committee_size as u8)).map(Into::into).collect(),
1808+
useful_shards_authors: (0u8..(committee_size as u8)).map(Into::into).collect(),
17411809
};
17421810
let serialized_block_bundle = SerializedBlockBundle::try_from(
17431811
SerializedBlockBundleParts::try_from(block_bundle).unwrap(),
@@ -2235,6 +2303,9 @@ mod tests {
22352303
useful_authorities: (0u8..(context.committee.size() as u8))
22362304
.map(Into::into)
22372305
.collect(),
2306+
useful_shards_authors: (0u8..(context.committee.size() as u8))
2307+
.map(Into::into)
2308+
.collect(),
22382309
};
22392310
let serialized_block_bundle = SerializedBlockBundle::try_from(
22402311
SerializedBlockBundleParts::try_from(block_bundle).unwrap(),
@@ -2378,6 +2449,9 @@ mod tests {
23782449
useful_authorities: (0u8..(context.committee.size() as u8))
23792450
.map(Into::into)
23802451
.collect(),
2452+
useful_shards_authors: (0u8..(context.committee.size() as u8))
2453+
.map(Into::into)
2454+
.collect(),
23812455
};
23822456
let serialized_block_bundle = SerializedBlockBundle::try_from(
23832457
SerializedBlockBundleParts::try_from(block_bundle).unwrap(),

crates/starfish/core/src/dag_state.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use crate::{
4141
pub(crate) const MAX_TRANSACTIONS_ACK_DEPTH: Round = 50;
4242
pub(crate) const MAX_HEADERS_PER_BUNDLE: usize = 150;
4343
pub(crate) const MAX_SHARDS_PER_BUNDLE: usize = 150;
44+
const MAX_ROUND_GAP_FOR_USEFUL_SHARDS: usize = 5;
4445

4546
/// DagState provides the API to write and read accepted blocks from the DAG.
4647
/// Only uncommitted and last committed blocks are cached in memory.
@@ -1326,7 +1327,7 @@ impl DagState {
13261327
break;
13271328
}
13281329
if useful_authorities.contains(&block_ref.author) {
1329-
to_take.push(block_ref.clone());
1330+
to_take.push(*block_ref);
13301331
}
13311332
}
13321333

@@ -1347,6 +1348,36 @@ impl DagState {
13471348
to_take
13481349
}
13491350

1351+
fn take_block_refs_of_useful_shards_for_authority(
1352+
&mut self,
1353+
authority_index: AuthorityIndex,
1354+
round_upper_bound_exclusive: Round,
1355+
useful_authorities: BTreeSet<AuthorityIndex>,
1356+
) -> Vec<BlockRef> {
1357+
let set = &mut self.shards_not_known_by_authority[authority_index.value()];
1358+
1359+
// Collect candidate block_refs we want to take out
1360+
let mut to_take = Vec::new();
1361+
1362+
for block_ref in set.iter() {
1363+
if to_take.len() >= MAX_SHARDS_PER_BUNDLE
1364+
|| block_ref.round >= round_upper_bound_exclusive
1365+
{
1366+
break;
1367+
}
1368+
if useful_authorities.contains(&block_ref.author) {
1369+
to_take.push(*block_ref);
1370+
}
1371+
}
1372+
1373+
// Remove the references from the set and update cordial knowledge
1374+
for block_ref in &to_take {
1375+
set.remove(block_ref);
1376+
}
1377+
1378+
to_take
1379+
}
1380+
13501381
/// Retrieves up to `MAX_HEADERS_PER_BUNDLE` previously unknown block
13511382
/// headers for the given authority, restricted to rounds strictly below
13521383
/// `round_upper_bound_exclusive`.
@@ -1395,6 +1426,28 @@ impl DagState {
13951426
.collect()
13961427
}
13971428

1429+
pub(crate) fn take_useful_shards_for_authority(
1430+
&mut self,
1431+
authority_index: AuthorityIndex,
1432+
round_upper_bound_exclusive: Round,
1433+
useful_authorities: BTreeSet<AuthorityIndex>,
1434+
) -> Vec<Bytes> {
1435+
let block_refs = self.take_block_refs_of_useful_shards_for_authority(
1436+
authority_index,
1437+
round_upper_bound_exclusive,
1438+
useful_authorities,
1439+
);
1440+
block_refs
1441+
.iter()
1442+
.map(|block_ref| {
1443+
self.recent_shards
1444+
.get(block_ref)
1445+
.expect("Shard should be in DagState")
1446+
.clone()
1447+
})
1448+
.collect::<Vec<_>>()
1449+
}
1450+
13981451
pub(crate) fn take_unknown_shards_for_authority(
13991452
&mut self,
14001453
authority_index: AuthorityIndex,
@@ -1424,6 +1477,19 @@ impl DagState {
14241477
}
14251478
shards
14261479
}
1480+
1481+
pub(crate) fn get_useful_shards_authors(
1482+
last_useful_round: Vec<Round>,
1483+
block_round: Round,
1484+
) -> BTreeSet<AuthorityIndex> {
1485+
let mut useful_authorities = BTreeSet::new();
1486+
for (i, round) in last_useful_round.iter().enumerate() {
1487+
if block_round - round < MAX_ROUND_GAP_FOR_USEFUL_SHARDS as u32 {
1488+
useful_authorities.insert(AuthorityIndex::from(i as u8));
1489+
}
1490+
}
1491+
useful_authorities
1492+
}
14271493
pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
14281494
let mut votes = Vec::new();
14291495
while !self.pending_commit_votes.is_empty() && votes.len() < limit {

crates/starfish/core/src/network/mod.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ pub(crate) struct BlockBundle {
247247
pub(crate) verified_headers: Vec<VerifiedBlockHeader>,
248248
pub(crate) serialized_shards: Vec<Bytes>,
249249
pub(crate) useful_authorities: BTreeSet<AuthorityIndex>,
250+
pub(crate) useful_shards_authors: BTreeSet<AuthorityIndex>,
250251
}
251252

252253
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -255,21 +256,39 @@ pub(crate) struct SerializedBlockBundleParts {
255256
pub(crate) serialized_headers: Vec<Bytes>,
256257
pub(crate) serialized_shards: Vec<Bytes>,
257258
pub(crate) useful_authorities_bitmask: [u64; 4],
259+
pub(crate) useful_shards_authors_bitmask: [u64; 4],
258260
}
259261

262+
fn authority_set_to_bitmask(authorities: &BTreeSet<AuthorityIndex>) -> [u64; 4] {
263+
let mut bitmask = [0u64; 4];
264+
for authority_index in authorities {
265+
let index = authority_index.value();
266+
let array_index = index / 64;
267+
let bit_pos = index % 64;
268+
bitmask[array_index] |= 1u64 << bit_pos;
269+
}
270+
bitmask
271+
}
272+
273+
fn bitmask_to_authority_set(bitmask: [u64; 4]) -> BTreeSet<AuthorityIndex> {
274+
let mut set = BTreeSet::new();
275+
for array_index in 0..4 {
276+
let mut bits = bitmask[array_index];
277+
let base = array_index * 64;
278+
while bits != 0 {
279+
let bit = bits.trailing_zeros() as usize;
280+
set.insert(AuthorityIndex::from((base + bit) as u8));
281+
bits &= bits - 1;
282+
}
283+
}
284+
set
285+
}
260286
impl SerializedBlockBundleParts {
261287
pub(crate) fn useful_authorities(&self) -> BTreeSet<AuthorityIndex> {
262-
let mut useful_authorities = BTreeSet::new();
263-
for array_index in 0..4 {
264-
let mut bits = self.useful_authorities_bitmask[array_index];
265-
let base = array_index * 64;
266-
while bits != 0 {
267-
let bit = bits.trailing_zeros() as usize;
268-
useful_authorities.insert(AuthorityIndex::from((base + bit) as u8));
269-
bits &= bits - 1;
270-
}
271-
}
272-
useful_authorities
288+
bitmask_to_authority_set(self.useful_authorities_bitmask)
289+
}
290+
pub(crate) fn useful_shards_authors(&self) -> BTreeSet<AuthorityIndex> {
291+
bitmask_to_authority_set(self.useful_shards_authors_bitmask)
273292
}
274293
}
275294

@@ -293,6 +312,7 @@ impl TryFrom<VerifiedBlock> for SerializedBlockBundleParts {
293312
serialized_headers: vec![],
294313
serialized_shards: vec![],
295314
useful_authorities_bitmask: [0u64; 4],
315+
useful_shards_authors_bitmask: [0u64; 4],
296316
})
297317
}
298318
}
@@ -312,18 +332,14 @@ impl TryFrom<BlockBundle> for SerializedBlockBundleParts {
312332
for block_header in block_bundle.verified_headers.iter() {
313333
serialized_block_headers.push(block_header.serialized().clone());
314334
}
315-
let mut useful_authorities_bitmask = [0u64; 4];
316-
for authority_index in block_bundle.useful_authorities {
317-
let index = authority_index.value();
318-
let array_index = index / 64;
319-
let bit_pos = index % 64;
320-
useful_authorities_bitmask[array_index] |= 1u64 << bit_pos;
321-
}
322335
Ok(Self {
323336
serialized_block: Bytes::from(bytes),
324337
serialized_headers: serialized_block_headers,
325338
serialized_shards: block_bundle.serialized_shards,
326-
useful_authorities_bitmask,
339+
useful_authorities_bitmask: authority_set_to_bitmask(&block_bundle.useful_authorities),
340+
useful_shards_authors_bitmask: authority_set_to_bitmask(
341+
&block_bundle.useful_shards_authors,
342+
),
327343
})
328344
}
329345
}
@@ -389,6 +405,7 @@ mod tests {
389405
verified_headers: vec![],
390406
serialized_shards: vec![],
391407
useful_authorities: useful_authorities.clone(),
408+
useful_shards_authors: useful_authorities.clone(),
392409
};
393410
let serialized_bundle = SerializedBlockBundle::try_from(block_bundle).unwrap();
394411
let serialized_bundle_parts =

0 commit comments

Comments
 (0)