Skip to content

Commit

Permalink
Introduce Notification block pinning limit (paritytech#2935)
Browse files Browse the repository at this point in the history
While investigating some pruning issues I found some room for
improvement in the notification pin handling.

**Problem:** It was not possible to define an upper limit on
notification pins. The block pinning cache has a limit, but only handles
bodies and justifications.

After this PR, bookkeeping for notifications is managed in the pinning
worker. A limit can be defined in the worker. If that limit is crossed,
blocks that were pinned for that notification are unpinned, which now
affects the state as well as bodies and justifications. The pinned
blocks cache still has a limit, but should never be hit.

closes #19

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 26, 2024
1 parent 0893ca1 commit 6c5a42a
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions substrate/client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl fmt::Display for UsageInfo {
pub struct UnpinHandleInner<Block: BlockT> {
/// Hash of the block pinned by this handle
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
}

impl<Block: BlockT> Debug for UnpinHandleInner<Block> {
Expand All @@ -291,20 +291,33 @@ impl<Block: BlockT> UnpinHandleInner<Block> {
/// Create a new [`UnpinHandleInner`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> Self {
Self { hash, unpin_worker_sender }
}
}

impl<Block: BlockT> Drop for UnpinHandleInner<Block> {
fn drop(&mut self) {
if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) {
if let Err(err) =
self.unpin_worker_sender.unbounded_send(UnpinWorkerMessage::Unpin(self.hash))
{
log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err);
};
}
}

/// Message that signals notification-based pinning actions to the pinning-worker.
///
/// When the notification is dropped, an `Unpin` message should be sent to the worker.
#[derive(Debug)]
pub enum UnpinWorkerMessage<Block: BlockT> {
/// Should be sent when a import or finality notification is created.
AnnouncePin(Block::Hash),
/// Should be sent when a import or finality notification is dropped.
Unpin(Block::Hash),
}

/// Keeps a specific block pinned while the handle is alive.
/// Once the last handle instance for a given block is dropped, the
/// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block).
Expand All @@ -315,7 +328,7 @@ impl<Block: BlockT> UnpinHandle<Block> {
/// Create a new [`UnpinHandle`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> UnpinHandle<Block> {
UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender)))
}
Expand Down Expand Up @@ -353,7 +366,7 @@ impl<Block: BlockT> BlockImportNotification<Block> {
header: Block::Header,
is_new_best: bool,
tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> Self {
Self {
hash,
Expand Down Expand Up @@ -412,7 +425,7 @@ impl<Block: BlockT> FinalityNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
mut summary: FinalizeSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> FinalityNotification<Block> {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
Expand All @@ -436,7 +449,7 @@ impl<Block: BlockT> BlockImportNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
summary: ImportSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> BlockImportNotification<Block> {
let hash = summary.hash;
BlockImportNotification {
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
self.storage.state_db.pin(&hash, number.saturated_into::<u64>(), hint).map_err(
|_| {
sp_blockchain::Error::UnknownBlock(format!(
"State already discarded for `{:?}`",
"Unable to pin: state already discarded for `{:?}`",
hash
))
},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/pinned_blocks_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use schnellru::{Limiter, LruMap};
use sp_runtime::{traits::Block as BlockT, Justifications};

const LOG_TARGET: &str = "db::pin";
const PINNING_CACHE_SIZE: usize = 1024;
const PINNING_CACHE_SIZE: usize = 2048;

/// Entry for pinned blocks cache.
struct PinnedBlockCacheEntry<Block: BlockT> {
Expand Down
1 change: 1 addition & 0 deletions substrate/client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "tim
tempfile = "3.1.0"
directories = "5.0.1"
static_init = "1.0.3"
schnellru = "0.2.1"

[dev-dependencies]
substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
Expand Down
53 changes: 28 additions & 25 deletions substrate/client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//! Substrate Client
use super::block_rules::{BlockRules, LookupResult as BlockLookupResult};
use futures::{FutureExt, StreamExt};
use log::{error, info, trace, warn};
use crate::client::notification_pinning::NotificationPinningWorker;
use log::{debug, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use prometheus_endpoint::Registry;
use rand::Rng;
Expand All @@ -38,7 +38,7 @@ use sc_client_api::{
execution_extensions::ExecutionExtensions,
notifications::{StorageEventStream, StorageNotifications},
CallExecutor, ExecutorProvider, KeysIter, OnFinalityAction, OnImportAction, PairsIter,
ProofProvider, UsageProvider,
ProofProvider, UnpinWorkerMessage, UsageProvider,
};
use sc_consensus::{
BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction,
Expand Down Expand Up @@ -114,7 +114,7 @@ where
block_rules: BlockRules<Block>,
config: ClientConfig<Block>,
telemetry: Option<TelemetryHandle>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
_phantom: PhantomData<RA>,
}

Expand Down Expand Up @@ -326,19 +326,35 @@ where
// dropped, the block will be unpinned automatically.
if let Some(ref notification) = finality_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
debug!(
"Unable to pin block for finality notification. hash: {}, Error: {}",
notification.hash, err
);
};
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::error!(
"Unable to send AnnouncePin worker message for finality: {e}"
)
});
}
}

if let Some(ref notification) = import_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
debug!(
"Unable to pin block for import notification. hash: {}, Error: {}",
notification.hash, err
);
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::error!("Unable to send AnnouncePin worker message for import: {e}")
});
};
}

Expand Down Expand Up @@ -416,25 +432,12 @@ where
backend.commit_operation(op)?;
}

let (unpin_worker_sender, mut rx) =
tracing_unbounded::<Block::Hash>("unpin-worker-channel", 10_000);
let task_backend = Arc::downgrade(&backend);
spawn_handle.spawn(
"unpin-worker",
None,
async move {
while let Some(message) = rx.next().await {
if let Some(backend) = task_backend.upgrade() {
backend.unpin_block(message);
} else {
log::debug!("Terminating unpin-worker, backend reference was dropped.");
return
}
}
log::debug!("Terminating unpin-worker, stream terminated.")
}
.boxed(),
let (unpin_worker_sender, rx) = tracing_unbounded::<UnpinWorkerMessage<Block>>(
"notification-pinning-worker-channel",
10_000,
);
let unpin_worker = NotificationPinningWorker::new(rx, backend.clone());
spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.run()));

Ok(Client {
backend,
Expand Down
1 change: 1 addition & 0 deletions substrate/client/service/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
mod block_rules;
mod call_executor;
mod client;
mod notification_pinning;
mod wasm_override;
mod wasm_substitutes;

Expand Down
Loading

0 comments on commit 6c5a42a

Please sign in to comment.