Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Optimize level monitor reconstruction #2461

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 23 additions & 27 deletions client/consensus/common/src/level_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_blockchain::{HashAndNumber, HeaderMetadata, TreeRoute};
use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -48,17 +48,17 @@ pub enum LevelLimit {

/// Support structure to constrain the number of leaves at each level.
pub struct LevelMonitor<Block: BlockT, BE> {
// Max number of leaves for each level.
/// Max number of leaves for each level.
level_limit: usize,
// Monotonic counter used to keep track of block freshness.
/// Monotonic counter used to keep track of block freshness.
pub(crate) import_counter: NumberFor<Block>,
// Map between blocks hashes and freshness.
/// Map between blocks hashes and freshness.
pub(crate) freshness: HashMap<Block::Hash, NumberFor<Block>>,
// Blockchain levels cache.
/// Blockchain levels cache.
pub(crate) levels: HashMap<NumberFor<Block>, HashSet<Block::Hash>>,
// Lower level number stored by the levels map.
/// Lower level number stored by the levels map.
lowest_level: NumberFor<Block>,
// Backend reference to remove blocks on level saturation.
/// Backend reference to remove blocks on level saturation.
backend: Arc<BE>,
}

Expand Down Expand Up @@ -96,7 +96,9 @@ where
///
/// Level limits are not enforced during this phase.
fn restore(&mut self) {
const ERR_MSG: &str = "route from finalized to leaf should be available; qed";
let info = self.backend.blockchain().info();

log::debug!(
target: "parachain",
"Restoring chain level monitor from last finalized block: {} {}",
Expand All @@ -105,30 +107,24 @@ where

self.lowest_level = info.finalized_number;
self.import_counter = info.finalized_number;
self.block_imported(info.finalized_number, info.finalized_hash);

let mut counter_max = info.finalized_number;

for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
let route =
sp_blockchain::tree_route(self.backend.blockchain(), info.finalized_hash, leaf)
.expect("Route from finalized to leaf should be available; qed");
if !route.retracted().is_empty() {
continue
}
route.enacted().iter().for_each(|elem| {
if !self.freshness.contains_key(&elem.hash) {
// Use the block height value as the freshness.
self.import_counter = elem.number;
self.block_imported(elem.number, elem.hash);
let mut meta = self.backend.blockchain().header_metadata(leaf).expect(ERR_MSG);

self.import_counter = self.import_counter.max(meta.number);

// Populate the monitor until we don't hit an already imported branch
while !self.freshness.contains_key(&meta.hash) {
self.freshness.insert(meta.hash, meta.number);
self.levels.entry(meta.number).or_default().insert(meta.hash);
if meta.number <= self.lowest_level {
break
}
});
counter_max = std::cmp::max(self.import_counter, counter_max);
meta = self.backend.blockchain().header_metadata(meta.parent).expect(ERR_MSG);
}
}

log::debug!(target: "parachain", "Restored chain level monitor up to height {}", counter_max);

self.import_counter = counter_max;
log::debug!(target: "parachain", "Restored chain level monitor up to height {}", self.import_counter);
}

/// Check and enforce the limit bound at the given height.
Expand Down Expand Up @@ -355,9 +351,9 @@ where

/// Add a new imported block information to the monitor.
pub fn block_imported(&mut self, number: NumberFor<Block>, hash: Block::Hash) {
self.import_counter += One::one();
self.freshness.insert(hash, self.import_counter);
self.levels.entry(number).or_default().insert(hash);
self.import_counter += One::one();

// Do cleanup once in a while, we are allowed to have some obsolete information.
let finalized_num = self.backend.blockchain().info().finalized_number;
Expand Down
13 changes: 9 additions & 4 deletions client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ fn restore_limit_monitor() {
LevelLimit::Some(LEVEL_LIMIT),
);

let monitor_sd = para_import.monitor.clone().unwrap();

let monitor = monitor_sd.shared_data();
assert_eq!(monitor.import_counter, 3);
std::mem::drop(monitor);

let block13 = build_and_import_block_ext(
&*client,
BlockOrigin::Own,
Expand All @@ -783,14 +789,13 @@ fn restore_limit_monitor() {
let expected = vec![blocks1[1].header.hash(), block13.header.hash()];
assert_eq!(leaves, expected);

let monitor = para_import.monitor.unwrap();
let monitor = monitor.shared_data();
assert_eq!(monitor.import_counter, 5);
let monitor = monitor_sd.shared_data();
assert_eq!(monitor.import_counter, 4);
assert!(monitor.levels.iter().all(|(number, hashes)| {
hashes
.iter()
.filter(|hash| **hash != block13.header.hash())
.all(|hash| *number == *monitor.freshness.get(hash).unwrap())
}));
assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter - 1);
assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter);
}