Skip to content

Commit

Permalink
[consensus] Make CommitRange inclusive and use in commit syncer (Myst…
Browse files Browse the repository at this point in the history
…enLabs#17788)

## Test plan 

pending ptn run
  • Loading branch information
arun-koshy authored Jun 4, 2024
1 parent 828720c commit 8a51d26
Show file tree
Hide file tree
Showing 21 changed files with 215 additions and 176 deletions.
4 changes: 2 additions & 2 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ mod tests {
authority_node::AuthorityService,
block::{BlockAPI as _, BlockRef, Round, TestBlock, VerifiedBlock},
block_verifier::NoopBlockVerifier,
commit::CommitRange,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
error::ConsensusResult,
Expand Down Expand Up @@ -442,8 +443,7 @@ mod tests {
async fn fetch_commits(
&self,
_peer: AuthorityIndex,
_start: Round,
_end: Round,
_commit_range: CommitRange,
_timeout: Duration,
) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
unimplemented!("Unimplemented")
Expand Down
17 changes: 10 additions & 7 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tracing::{debug, info, warn};
use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
block_verifier::BlockVerifier,
commit::{CommitAPI as _, TrustedCommit},
commit::{CommitAPI as _, CommitRange, TrustedCommit},
commit_syncer::CommitVoteMonitor,
context::Context,
core_thread::CoreThreadDispatcher,
Expand Down Expand Up @@ -314,15 +314,18 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
async fn handle_fetch_commits(
&self,
_peer: AuthorityIndex,
start: CommitIndex,
end: CommitIndex,
commit_range: CommitRange,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
fail_point_async!("consensus-rpc-response");

// Compute an exclusive end index and bound the maximum number of commits scanned.
let exclusive_end =
(end + 1).min(start + self.context.parameters.commit_sync_batch_size as CommitIndex);
let mut commits = self.store.scan_commits((start..exclusive_end).into())?;
// Compute an inclusive end index and bound the maximum number of commits scanned.
let inclusive_end = commit_range.end().min(
commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
- 1,
);
let mut commits = self
.store
.scan_commits((commit_range.start()..=inclusive_end).into())?;
let mut certifier_block_refs = vec![];
'commit: while let Some(c) = commits.last() {
let index = c.index();
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ mod test {
use super::*;
use crate::{
block::{BlockRef, TestBlock},
commit::CommitRange,
core::CoreSignals,
network::BlockStream,
Round,
Expand Down Expand Up @@ -259,8 +260,7 @@ mod test {
async fn fetch_commits(
&self,
_peer: AuthorityIndex,
_start: Round,
_end: Round,
_commit_range: CommitRange,
_timeout: Duration,
) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
unimplemented!("Unimplemented")
Expand Down
56 changes: 36 additions & 20 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
cmp::Ordering,
fmt::{self, Display, Formatter},
hash::{Hash, Hasher},
ops::{Deref, Range},
ops::{Deref, RangeInclusive},
sync::Arc,
};

Expand Down Expand Up @@ -546,30 +546,33 @@ impl CommitInfo {
}

/// CommitRange stores a range of CommitIndex. The range contains the start (inclusive)
/// and end (exclusive) commit indices and can be ordered for use as the key of a table.
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CommitRange(Range<CommitIndex>);
/// and end (inclusive) commit indices and can be ordered for use as the key of a table.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CommitRange(RangeInclusive<CommitIndex>);

#[allow(unused)]
impl CommitRange {
pub(crate) fn new(range: Range<CommitIndex>) -> Self {
pub(crate) fn new(range: RangeInclusive<CommitIndex>) -> Self {
Self(range)
}

// Inclusive
pub(crate) fn start(&self) -> CommitIndex {
self.0.start
*self.0.start()
}

// Exclusive
pub(crate) fn end(&self) -> CommitIndex {
self.0.end
*self.0.end()
}

/// Check if the provided range is sequentially after this range with the same
/// range length.
/// Check whether the two ranges have the same size.
pub(crate) fn is_equal_size(&self, other: &Self) -> bool {
self.0.size_hint() == other.0.size_hint()
}

/// Check if the provided range is sequentially after this range.
pub(crate) fn is_next_range(&self, other: &Self) -> bool {
self.0.len() == other.0.len() && self.end() == other.start()
&self.end() + 1 == other.start()
}
}

Expand All @@ -587,12 +590,18 @@ impl PartialOrd for CommitRange {
}
}

impl From<Range<CommitIndex>> for CommitRange {
fn from(range: Range<CommitIndex>) -> Self {
impl From<RangeInclusive<CommitIndex>> for CommitRange {
fn from(range: RangeInclusive<CommitIndex>) -> Self {
Self(range)
}
}

impl Default for CommitRange {
fn default() -> Self {
Self(CommitIndex::default()..=CommitIndex::default())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -680,20 +689,27 @@ mod tests {

#[tokio::test]
async fn test_commit_range() {
let range1 = CommitRange::new(1..6);
let range2 = CommitRange::new(2..6);
let range3 = CommitRange::new(5..10);
let range4 = CommitRange::new(6..11);
let range5 = CommitRange::new(6..9);
telemetry_subscribers::init_for_testing();
let range1 = CommitRange::new(1..=5);
let range2 = CommitRange::new(2..=6);
let range3 = CommitRange::new(5..=10);
let range4 = CommitRange::new(6..=10);
let range5 = CommitRange::new(6..=9);

assert_eq!(range1.start(), 1);
assert_eq!(range1.end(), 6);
assert_eq!(range1.end(), 5);

// Test next range check
assert!(!range1.is_next_range(&range2));
assert!(!range1.is_next_range(&range3));
assert!(range1.is_next_range(&range4));
assert!(!range1.is_next_range(&range5));
assert!(range1.is_next_range(&range5));

// Test equal size range check
assert!(range1.is_equal_size(&range2));
assert!(!range1.is_equal_size(&range3));
assert!(range1.is_equal_size(&range4));
assert!(!range1.is_equal_size(&range5));

// Test range ordering
assert!(range1 < range2);
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl CommitObserver {
// We should not send the last processed commit again, so last_processed_commit_index+1
let unsent_commits = self
.store
.scan_commits(((last_processed_commit_index + 1)..CommitIndex::MAX).into())
.scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
.expect("Scanning commits should not fail");

// Resend all the committed subdags to the consensus output channel
Expand Down Expand Up @@ -298,7 +298,7 @@ mod tests {
commits.last().unwrap().commit_ref.index
);
let all_stored_commits = mem_store
.scan_commits((0..CommitIndex::MAX).into())
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), leaders.len());
let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
Expand Down
63 changes: 30 additions & 33 deletions consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ use crate::{
block::{BlockAPI, BlockRef, SignedBlock, VerifiedBlock},
block_verifier::BlockVerifier,
commit::{
Commit, CommitAPI as _, CommitDigest, CommitRef, TrustedCommit, GENESIS_COMMIT_INDEX,
Commit, CommitAPI as _, CommitDigest, CommitRange, CommitRef, TrustedCommit,
GENESIS_COMMIT_INDEX,
},
context::Context,
core_thread::CoreThreadDispatcher,
Expand Down Expand Up @@ -109,9 +110,9 @@ impl<C: NetworkClient> CommitSyncer<C> {
// Inflight requests to fetch commits from different authorities.
let mut inflight_fetches = JoinSet::new();
// Additional ranges (inclusive start and end) of commits to fetch.
let mut pending_fetches = BTreeSet::<(CommitIndex, CommitIndex)>::new();
let mut pending_fetches = BTreeSet::<CommitRange>::new();
// Fetched commits and blocks by commit indices.
let mut fetched_blocks = BTreeMap::<(CommitIndex, CommitIndex), Vec<VerifiedBlock>>::new();
let mut fetched_blocks = BTreeMap::<CommitRange, Vec<VerifiedBlock>>::new();
// Highest end index among inflight and pending fetches.
// Used to determine if and which new ranges to fetch.
let mut highest_scheduled_index = Option::<CommitIndex>::None;
Expand Down Expand Up @@ -150,7 +151,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
if range_end > quorum_commit_index {
break 'pending;
}
pending_fetches.insert((range_start, range_end));
pending_fetches.insert((range_start..=range_end).into());
// quorum_commit_index should be non-decreasing, so highest_scheduled_index should not
// decrease either.
highest_scheduled_index = Some(range_end);
Expand All @@ -177,27 +178,27 @@ impl<C: NetworkClient> CommitSyncer<C> {
let (commit_start, commit_end) = (commits.first().unwrap().index(), commits.last().unwrap().index());
// Allow returning partial results, and try fetching the rest separately.
if commit_end < target_end {
pending_fetches.insert((commit_end + 1, target_end));
pending_fetches.insert((commit_end + 1..=target_end).into());
}
// Make sure synced_commit_index is up to date.
synced_commit_index = synced_commit_index.max(inner.dag_state.read().last_commit_index());
// Only add new blocks if at least some of them are not already synced.
if synced_commit_index < commit_end {
fetched_blocks.insert((commit_start, commit_end), blocks);
fetched_blocks.insert((commit_start..=commit_end).into(), blocks);
}
// Try to process as many fetched blocks as possible.
'fetched: while let Some(((fetched_start, _end), _blocks)) = fetched_blocks.first_key_value() {
'fetched: while let Some((fetched_commit_range, _blocks)) = fetched_blocks.first_key_value() {
// Only pop fetched_blocks if there is no gap with blocks already synced.
// Note: start, end and synced_commit_index are all inclusive.
let ((_start, fetched_end), blocks) = if *fetched_start <= synced_commit_index + 1 {
let (fetched_commit_range, blocks) = if fetched_commit_range.start() <= synced_commit_index + 1 {
fetched_blocks.pop_first().unwrap()
} else {
// Found gap between earliest fetched block and latest synced block,
// so not sending additional blocks to Core.
break 'fetched;
};
// Avoid sending to Core a whole batch of already synced blocks.
if fetched_end <= synced_commit_index {
if fetched_commit_range.end() <= synced_commit_index {
continue 'fetched;
}
debug!(
Expand All @@ -223,7 +224,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
}
};
// Once commits and blocks are sent to Core, ratchet up synced_commit_index
synced_commit_index = synced_commit_index.max(fetched_end);
synced_commit_index = synced_commit_index.max(fetched_commit_range.end());
}
}

Expand Down Expand Up @@ -256,14 +257,13 @@ impl<C: NetworkClient> CommitSyncer<C> {
if inflight_fetches.len() >= target_parallel_fetches {
break;
}
let Some((start, end)) = pending_fetches.pop_first() else {
let Some(commit_range) = pending_fetches.pop_first() else {
break;
};
inflight_fetches.spawn(Self::fetch_loop(
inner.clone(),
fetch_state.clone(),
start,
end,
commit_range,
));
}

Expand All @@ -286,24 +286,20 @@ impl<C: NetworkClient> CommitSyncer<C> {
async fn fetch_loop(
inner: Arc<Inner<C>>,
fetch_state: Arc<Mutex<FetchState>>,
start: CommitIndex,
end: CommitIndex,
commit_range: CommitRange,
) -> (CommitIndex, Vec<TrustedCommit>, Vec<VerifiedBlock>) {
let _timer = inner
.context
.metrics
.node_metrics
.commit_sync_fetch_loop_latency
.start_timer();
info!(
"Starting to fetch commits from {} to {} (inclusive) ...",
start, end
);
info!("Starting to fetch commits in {commit_range:?} ...",);
loop {
match Self::fetch_once(inner.clone(), fetch_state.clone(), start, end).await {
match Self::fetch_once(inner.clone(), fetch_state.clone(), commit_range.clone()).await {
Ok((commits, blocks)) => {
info!("Finished fetching commits from {} to {}", start, end);
return (end, commits, blocks);
info!("Finished fetching commits in {commit_range:?}",);
return (commit_range.end(), commits, blocks);
}
Err(e) => {
warn!("Failed to fetch: {}", e);
Expand All @@ -318,8 +314,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
async fn fetch_once(
inner: Arc<Inner<C>>,
fetch_state: Arc<Mutex<FetchState>>,
start: CommitIndex,
end: CommitIndex,
commit_range: CommitRange,
) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
const FETCH_COMMITS_TIMEOUT: Duration = Duration::from_secs(10);
const FETCH_BLOCKS_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -347,10 +342,14 @@ impl<C: NetworkClient> CommitSyncer<C> {
sleep(available_time - now).await;
}

// 2. Fetch commits in the range [start, end] from the selected authority.
// 2. Fetch commits in the commit range from the selected authority.
let (serialized_commits, serialized_blocks) = match inner
.network_client
.fetch_commits(target_authority, start, end, FETCH_COMMITS_TIMEOUT)
.fetch_commits(
target_authority,
commit_range.clone(),
FETCH_COMMITS_TIMEOUT,
)
.await
{
Ok(result) => {
Expand Down Expand Up @@ -378,8 +377,7 @@ impl<C: NetworkClient> CommitSyncer<C> {
// as well.
let commits = inner.verify_commits(
target_authority,
start,
end,
commit_range,
serialized_commits,
serialized_blocks,
)?;
Expand Down Expand Up @@ -549,8 +547,7 @@ impl<C: NetworkClient> Inner<C> {
fn verify_commits(
&self,
peer: AuthorityIndex,
start: CommitIndex,
end: CommitIndex,
commit_range: CommitRange,
serialized_commits: Vec<Bytes>,
serialized_blocks: Vec<Bytes>,
) -> ConsensusResult<Vec<TrustedCommit>> {
Expand All @@ -562,10 +559,10 @@ impl<C: NetworkClient> Inner<C> {
let digest = TrustedCommit::compute_digest(serialized);
if commits.is_empty() {
// start is inclusive, so first commit must be at the start index.
if commit.index() != start {
if commit.index() != commit_range.start() {
return Err(ConsensusError::UnexpectedStartCommit {
peer,
start,
start: commit_range.start(),
commit: Box::new(commit),
});
}
Expand All @@ -584,7 +581,7 @@ impl<C: NetworkClient> Inner<C> {
}
}
// Do not process more commits past the end index.
if commit.index() > end {
if commit.index() > commit_range.end() {
break;
}
commits.push((digest, commit));
Expand Down
Loading

0 comments on commit 8a51d26

Please sign in to comment.