Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix some problems with prove_warp_sync (#8037)
Browse files Browse the repository at this point in the history
* Fix some problems with prove_warp_sync

* Update client/finality-grandpa/src/finality_proof.rs

Co-authored-by: cheme <emericchevalier.pro@gmail.com>

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
  • Loading branch information
expenses and cheme authored Feb 5, 2021
1 parent 3c9b031 commit cc71cca
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
6 changes: 3 additions & 3 deletions client/finality-grandpa-warp-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ struct Request<B: BlockT> {
const WARP_SYNC_FRAGMENTS_LIMIT: usize = 100;

/// Number of item with justification in warp sync cache.
/// This should be customizable, setting a low number
/// until then.
const WARP_SYNC_CACHE_SIZE: usize = 20;
/// This should be customizable, but setting it to the max number of fragments
/// we return seems like a good idea until then.
const WARP_SYNC_CACHE_SIZE: usize = WARP_SYNC_FRAGMENTS_LIMIT;

/// Handler for incoming grandpa warp sync requests from a remote peer.
pub struct GrandpaWarpSyncRequestHandler<TBackend, TBlock: BlockT> {
Expand Down
50 changes: 27 additions & 23 deletions client/finality-grandpa/src/finality_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
// This operation is a costy and only for the delay corner case.
while index > Zero::zero() {
index = index - One::one();
if let Some((fragement, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if let Some((fragment, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| &next > header.number()).unwrap_or(false) {
result.push(fragement);
result.push(fragment);
last_apply = Some(apply_block);
} else {
break;
Expand All @@ -289,7 +289,7 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(

let mut index = *header.number();
while index <= end_number {
if max_fragment_limit.map(|limit| result.len() <= limit).unwrap_or(false) {
if max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false) {
break;
}

Expand All @@ -305,7 +305,10 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
index = index + One::one();
}

if result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
let at_limit = max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false);

// add last finalized block if reached and not already included.
if !at_limit && result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
let header = blockchain.expect_header(end)?;
if let Some(justification) = blockchain.justification(BlockId::Number(end_number.clone()))? {
result.push(AuthoritySetProofFragment {
Expand All @@ -328,7 +331,7 @@ fn get_warp_sync_proof_fragment<Block: BlockT, B: BlockchainBackend<Block>>(
) -> sp_blockchain::Result<Option<(AuthoritySetProofFragment<Block::Header>, NumberFor<Block>)>> {
if let Some(cache) = cache.as_mut() {
if let Some(result) = cache.get_item(index) {
return Ok(result.clone());
return Ok(result);
}
}

Expand Down Expand Up @@ -541,20 +544,20 @@ impl<Block: BlockT> BlockJustification<Block::Header> for GrandpaJustification<B

/// Simple cache for warp sync queries.
pub struct WarpSyncFragmentCache<Header: HeaderT> {
header_has_proof_fragment: std::collections::HashMap<Header::Number, bool>,
cache: linked_hash_map::LinkedHashMap<
Header::Number,
Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
(AuthoritySetProofFragment<Header>, Header::Number),
>,
headers_with_justification: usize,
limit: usize,
}

impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
/// Instantiate a new cache for the warp sync prover.
pub fn new(size: usize) -> Self {
WarpSyncFragmentCache {
header_has_proof_fragment: Default::default(),
cache: Default::default(),
headers_with_justification: 0,
limit: size,
}
}
Expand All @@ -564,31 +567,32 @@ impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
at: Header::Number,
item: Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
) {
if self.cache.len() == self.limit {
self.pop_one();
}
if item.is_some() {
// we do not check previous value as cached value is always supposed to
// be queried before calling 'new_item'.
self.headers_with_justification += 1;
self.header_has_proof_fragment.insert(at, item.is_some());

if let Some(item) = item {
if self.cache.len() == self.limit {
self.pop_one();
}

self.cache.insert(at, item);
}
self.cache.insert(at, item);
}

fn pop_one(&mut self) {
while let Some(v) = self.cache.pop_front() {
if v.1.is_some() {
self.headers_with_justification -= 1;
break;
}
if let Some((header_number, _)) = self.cache.pop_front() {
self.header_has_proof_fragment.remove(&header_number);
}
}

fn get_item(
&mut self,
block: Header::Number,
) -> Option<&mut Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
self.cache.get_refresh(&block)
) -> Option<Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
match self.header_has_proof_fragment.get(&block) {
Some(true) => Some(self.cache.get_refresh(&block).cloned()),
Some(false) => Some(None),
None => None
}
}
}

Expand Down

0 comments on commit cc71cca

Please sign in to comment.