Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Notification-based block pinning #13157

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d5c1427
Worker
skunert Dec 22, 2022
6131598
Reorganize and unpin onnotification drop
skunert Jan 6, 2023
ccdf8ed
Pin in state-db, pass block number
skunert Jan 9, 2023
0443235
Pin blocks in blockchain db
skunert Jan 10, 2023
c1a25dc
Switch to reference counted LRU
skunert Jan 10, 2023
9d824d4
Merge branch 'master' into skunert/notification-based-pinning
skunert Jan 10, 2023
2f7f943
Disable pinning when we keep all blocks
skunert Jan 11, 2023
65caa1a
Fix pinning hint for state-db
skunert Jan 12, 2023
cc000a3
Remove pinning from backend layer
skunert Jan 12, 2023
8a22325
Improve readability
skunert Jan 12, 2023
d3d208a
Add justifications to test
skunert Jan 12, 2023
bc2d40c
Fix justification behaviour
skunert Jan 12, 2023
da7a855
Remove debug prints
skunert Jan 12, 2023
7daf11c
Convert channels to tracing_unbounded
skunert Jan 16, 2023
2fb995c
Add comments to the test
skunert Jan 16, 2023
7a072dd
Documentation and Cleanup
skunert Jan 16, 2023
92e3928
Move task start to client
skunert Jan 16, 2023
3ec854e
Simplify cache
skunert Jan 16, 2023
d5e2b1d
Improve test, remove unwanted log
skunert Jan 16, 2023
03051ca
Add tracing logs, remove expect for block number
skunert Jan 16, 2023
c4434bb
Cleanup
skunert Jan 16, 2023
347c80a
Add conversion method for unpin handle to Finalitynotification
skunert Jan 17, 2023
1198568
Revert unwanted changes
skunert Jan 17, 2023
363f01a
Improve naming
skunert Jan 17, 2023
b220553
Make clippy happy
skunert Jan 17, 2023
10dfdf7
Fix docs
skunert Jan 17, 2023
e2ff001
Merge branch 'master' into skunert/notification-based-pinning
skunert Jan 17, 2023
c4608da
Use `NumberFor` instead of u64 in API
skunert Jan 17, 2023
68cc76f
Hand over weak reference to unpin worker task
skunert Jan 17, 2023
fe2d4cc
Unwanted
skunert Jan 17, 2023
f5d6b08
&Hash -> Hash
skunert Jan 17, 2023
946e06a
Remove number from interface, rename `_unpin_handle`, LOG_TARGET
skunert Jan 18, 2023
a48e44b
Move RwLock one layer up
skunert Jan 18, 2023
4014c16
Apply code style suggestions
skunert Jan 18, 2023
d547fc2
Improve comments
skunert Jan 18, 2023
de6aa36
Replace lru crate by schnellru
skunert Jan 18, 2023
81f7493
Merge remote-tracking branch 'origin' into skunert/notification-based…
skunert Jan 19, 2023
6fa6d5c
Only insert values for pinned items + better docs
skunert Jan 19, 2023
a177b50
Apply suggestions from code review
skunert Jan 19, 2023
cbd0868
Improve comments, log target and test
skunert Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
&Hash -> Hash
  • Loading branch information
skunert committed Jan 17, 2023
commit f5d6b08a8ac3a655d4b34a6a5c14f0796e17aae5
4 changes: 2 additions & 2 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,10 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {
/// Pin the block to keep bodies, justification and state available after pruning.
skunert marked this conversation as resolved.
Show resolved Hide resolved
/// Number of pins are reference counted. Users need to make sure to perform
/// one call to `unpin_block` per call to `pin_block`.
skunert marked this conversation as resolved.
Show resolved Hide resolved
fn pin_block(&self, hash: &Block::Hash, number: NumberFor<Block>) -> sp_blockchain::Result<()>;
fn pin_block(&self, hash: Block::Hash, number: NumberFor<Block>) -> sp_blockchain::Result<()>;

/// Unpin the block to allow pruning.
fn unpin_block(&self, hash: &Block::Hash);
fn unpin_block(&self, hash: Block::Hash);

/// Returns true if state for given block is available.
fn have_state_at(&self, hash: Block::Hash, _number: NumberFor<Block>) -> bool {
Expand Down
8 changes: 2 additions & 6 deletions client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,15 +789,11 @@ where
false
}

fn pin_block(
&self,
_: &<Block as BlockT>::Hash,
_: NumberFor<Block>,
) -> blockchain::Result<()> {
fn pin_block(&self, _: <Block as BlockT>::Hash, _: NumberFor<Block>) -> blockchain::Result<()> {
Ok(())
}

fn unpin_block(&self, _: &<Block as BlockT>::Hash) {}
fn unpin_block(&self, _: <Block as BlockT>::Hash) {}
}

impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> where Block::Hash: Ord {}
Expand Down
56 changes: 28 additions & 28 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ impl<Block: BlockT> PinnedBlockCache<Block> {
cache.clear();
}

pub fn contains(&self, hash: &Block::Hash) -> bool {
pub fn contains(&self, hash: Block::Hash) -> bool {
let cache = self.cache.read();
cache.contains(hash)
cache.contains(&hash)
}

pub fn insert_body(&self, hash: Block::Hash, extrinsics: Option<Vec<Block::Extrinsic>>) {
Expand All @@ -548,12 +548,12 @@ impl<Block: BlockT> PinnedBlockCache<Block> {
log::trace!(target: "db-pin", "Cached justification. hash = {}, num_entries = {}", hash, cache.len());
}

pub fn remove(&self, hash: &Block::Hash) {
pub fn remove(&self, hash: Block::Hash) {
let mut cache = self.cache.write();
if let Some(entry) = cache.peek_mut(hash) {
if let Some(entry) = cache.peek_mut(&hash) {
entry.decrease_ref();
if entry.has_no_references() {
cache.pop(hash);
cache.pop(&hash);
log::trace!(target: "db-pin", "Removed pinned cache entry. hash = {}, num_entries = {}", hash, cache.len());
}
}
Expand Down Expand Up @@ -635,7 +635,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
/// Reference count of the item will not be increased. Use this
/// to load values for items into the cache which have already been pinned.
fn insert_justifications_if_pinned(&self, hash: Block::Hash, justification: Justification) {
if !self.pinned_blocks_cache.contains(&hash) {
if !self.pinned_blocks_cache.contains(hash) {
return
}

Expand All @@ -647,7 +647,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
/// Reference count of the item will not be increased. Use this
/// to load values for items into the cache which have already been pinned.
fn insert_persisted_justifications_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> {
if !self.pinned_blocks_cache.contains(&hash) {
if !self.pinned_blocks_cache.contains(hash) {
return Ok(())
}
let justifications = self.justifications(hash)?;
Expand All @@ -659,7 +659,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
/// Reference count of the item will not be increased. Use this
/// to load values for items items into the cache which have already been pinned.
fn insert_persisted_body_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> {
if !self.pinned_blocks_cache.contains(&hash) {
if !self.pinned_blocks_cache.contains(hash) {
return Ok(())
}

Expand All @@ -674,7 +674,7 @@ impl<Block: BlockT> BlockchainDb<Block> {
}

/// Decrease reference count for pinned item and remove if reference count is 0.
fn unpin(&self, hash: &Block::Hash) {
fn unpin(&self, hash: Block::Hash) {
self.pinned_blocks_cache.remove(hash);
}
}
Expand Down Expand Up @@ -2590,11 +2590,11 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {

fn pin_block(
&self,
hash: &<Block as BlockT>::Hash,
hash: <Block as BlockT>::Hash,
number: NumberFor<Block>,
skunert marked this conversation as resolved.
Show resolved Hide resolved
) -> sp_blockchain::Result<()> {
let hint = || {
let header_metadata = self.blockchain.header_metadata(*hash);
let header_metadata = self.blockchain.header_metadata(hash);
header_metadata
.map(|hdr| {
sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref())
Expand All @@ -2605,7 +2605,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
};
self.storage
.state_db
.pin(hash, number.saturated_into::<u64>(), hint)
.pin(&hash, number.saturated_into::<u64>(), hint)
.map_err(|_| {
sp_blockchain::Error::UnknownBlock(format!(
"State already discarded for {:?}",
Expand All @@ -2615,13 +2615,13 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {

if self.blocks_pruning != BlocksPruning::KeepAll {
// Only increase reference count for this hash. Value is loaded once we prune.
self.blockchain.bump_ref(*hash);
self.blockchain.bump_ref(hash);
}
Ok(())
}

fn unpin_block(&self, hash: &<Block as BlockT>::Hash) {
self.storage.state_db.unpin(hash);
fn unpin_block(&self, hash: <Block as BlockT>::Hash) {
self.storage.state_db.unpin(&hash);

if self.blocks_pruning != BlocksPruning::KeepAll {
self.blockchain.unpin(hash);
Expand Down Expand Up @@ -4277,7 +4277,7 @@ pub(crate) mod tests {
.unwrap();
blocks.push(hash);
// Avoid block pruning.
backend.pin_block(&blocks[i as usize], i).unwrap();
backend.pin_block(blocks[i as usize], i).unwrap();

prev_hash = hash;
}
Expand All @@ -4289,8 +4289,8 @@ pub(crate) mod tests {
assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap());

// Block 1 gets pinned three times
backend.pin_block(&blocks[1], 1).unwrap();
backend.pin_block(&blocks[1], 1).unwrap();
backend.pin_block(blocks[1], 1).unwrap();
backend.pin_block(blocks[1], 1).unwrap();

// Finalize all blocks. This will trigger pruning.
let mut op = backend.begin_operation().unwrap();
Expand Down Expand Up @@ -4331,7 +4331,7 @@ pub(crate) mod tests {

// Unpin all blocks. Values should be removed from cache.
for block in &blocks {
backend.unpin_block(&block);
backend.unpin_block(*block);
}

assert!(bc.body(blocks[0]).unwrap().is_none());
Expand All @@ -4344,10 +4344,10 @@ pub(crate) mod tests {
assert!(bc.justifications(blocks[3]).unwrap().is_none());

// After these unpins, block 1 should also be removed
backend.unpin_block(&blocks[1]);
backend.unpin_block(blocks[1]);
assert!(bc.body(blocks[1]).unwrap().is_some());
assert!(bc.justifications(blocks[1]).unwrap().is_some());
backend.unpin_block(&blocks[1]);
backend.unpin_block(blocks[1]);
assert!(bc.body(blocks[1]).unwrap().is_none());
assert!(bc.justifications(blocks[1]).unwrap().is_none());

Expand All @@ -4365,7 +4365,7 @@ pub(crate) mod tests {
.unwrap();
blocks.push(hash);

backend.pin_block(&blocks[4], 4).unwrap();
backend.pin_block(blocks[4], 4).unwrap();
// Mark block 5 as finalized.
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, blocks[5]).unwrap();
Expand All @@ -4384,7 +4384,7 @@ pub(crate) mod tests {
);
assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap());

backend.unpin_block(&blocks[4]);
backend.unpin_block(blocks[4]);
assert!(bc.body(blocks[4]).unwrap().is_none());
assert!(bc.justifications(blocks[4]).unwrap().is_none());

Expand All @@ -4397,7 +4397,7 @@ pub(crate) mod tests {
blocks.push(hash);

// Pin block 5 so it gets loaded into the cache on prune
backend.pin_block(&blocks[5], 5).unwrap();
backend.pin_block(blocks[5], 5).unwrap();

// Finalize block 6 so block 5 gets pruned. Since it is pinned both justifications should be
// in memory.
Expand Down Expand Up @@ -4434,7 +4434,7 @@ pub(crate) mod tests {
blocks.push(hash);

// Avoid block pruning.
backend.pin_block(&blocks[i as usize], i).unwrap();
backend.pin_block(blocks[i as usize], i).unwrap();

prev_hash = hash;
}
Expand All @@ -4458,7 +4458,7 @@ pub(crate) mod tests {
.unwrap();

// Do not prune the fork hash.
backend.pin_block(&fork_hash_3, 3).unwrap();
backend.pin_block(fork_hash_3, 3).unwrap();

let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, blocks[4]).unwrap();
Expand All @@ -4484,15 +4484,15 @@ pub(crate) mod tests {

// Unpin all blocks, except the forked one.
for block in &blocks {
backend.unpin_block(&block);
backend.unpin_block(*block);
}
assert!(bc.body(blocks[0]).unwrap().is_none());
assert!(bc.body(blocks[1]).unwrap().is_none());
assert!(bc.body(blocks[2]).unwrap().is_none());
assert!(bc.body(blocks[3]).unwrap().is_none());

assert!(bc.body(fork_hash_3).unwrap().is_some());
backend.unpin_block(&fork_hash_3);
backend.unpin_block(fork_hash_3);
assert!(bc.body(fork_hash_3).unwrap().is_none());
}
}
6 changes: 3 additions & 3 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ where
self.backend.commit_operation(op)?;

if let (Some(ref notification), Some(number)) = (&finality_notification, finality_num) {
if let Err(err) = self.backend.pin_block(&notification.hash, number) {
if let Err(err) = self.backend.pin_block(notification.hash, number) {
error!(
"Unable to pin block for finality notification. hash: {}, Error: {}",
notification.hash, err
Expand All @@ -343,7 +343,7 @@ where
}

if let (Some(ref notification), Some(number)) = (&import_notification, import_num) {
if let Err(err) = self.backend.pin_block(&notification.hash, number) {
if let Err(err) = self.backend.pin_block(notification.hash, number) {
error!(
"Unable to pin block for import notification. hash: {}, Error: {}",
notification.hash, err
Expand Down Expand Up @@ -435,7 +435,7 @@ where
loop {
if let Some(message) = rx.next().await {
if let Some(backend) = task_backend.upgrade() {
backend.unpin_block(&message);
backend.unpin_block(message);
} else {
log::warn!(target: "db", "Terminating unpin-worker, backend reference was dropped.");
return
Expand Down