Skip to content

Commit dff9f97

Browse files
committed
feat(deferred-persist): Part 1: unpersisted gauge
Added a gauge to track the number of unpersisted nodes at any time. These can get used later by the revision manager to decide whether or not to persist a proposal (among other possible strategies). It's also helpful to know how many nodes are still in memory that could be persisted. When increasing this counter, we do it all at once, but we decrement one by one (or by the number of entries from the kernel's completion queue). This is because the writes are slower than the adds, and there is some integer to floating point math that happens each time the counter is adjusted.
1 parent 4b24aae commit dff9f97

File tree

3 files changed

+98
-20
lines changed

3 files changed

+98
-20
lines changed

storage/src/nodestore/hash.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,16 @@ where
6868
}
6969

7070
/// Hashes `node`, which is at the given `path_prefix`, and its children recursively.
71-
/// Returns the hashed node and its hash.
71+
/// Returns the hashed node, its hash, and the count of nodes processed.
7272
pub(super) fn hash_helper(
7373
#[cfg(feature = "ethhash")] &self,
7474
mut node: Node,
7575
path_prefix: &mut Path,
7676
#[cfg(feature = "ethhash")] fake_root_extra_nibble: Option<u8>,
77-
) -> Result<(MaybePersistedNode, HashType), FileIoError> {
77+
) -> Result<(MaybePersistedNode, HashType, usize), FileIoError> {
7878
// If this is a branch, find all unhashed children and recursively hash them.
7979
trace!("hashing {node:?} at {path_prefix:?}");
80+
let mut nodes_processed = 1usize; // Count this node
8081
if let Node::Branch(ref mut b) = node {
8182
// special case code for ethereum hashes at the account level
8283
#[cfg(feature = "ethhash")]
@@ -157,10 +158,13 @@ where
157158
path_prefix.0.push(nibble as u8);
158159

159160
#[cfg(feature = "ethhash")]
160-
let (child_node, child_hash) =
161+
let (child_node, child_hash, child_count) =
161162
self.hash_helper(child_node, path_prefix, make_fake_root)?;
162163
#[cfg(not(feature = "ethhash"))]
163-
let (child_node, child_hash) = Self::hash_helper(child_node, path_prefix)?;
164+
let (child_node, child_hash, child_count) =
165+
Self::hash_helper(child_node, path_prefix)?;
166+
167+
nodes_processed = nodes_processed.saturating_add(child_count);
164168

165169
*child = Some(Child::MaybePersisted(child_node, child_hash));
166170
trace!("child now {child:?}");
@@ -188,6 +192,6 @@ where
188192
#[cfg(not(feature = "ethhash"))]
189193
let hash = hash_node(&node, path_prefix);
190194

191-
Ok((SharedNode::new(node).into(), hash))
195+
Ok((SharedNode::new(node).into(), hash, nodes_processed))
192196
}
193197
}

storage/src/nodestore/mod.rs

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,11 @@ use crate::logger::trace;
4949
use crate::node::branch::ReadSerializable as _;
5050
use arc_swap::ArcSwap;
5151
use arc_swap::access::DynAccess;
52+
use metrics::gauge;
5253
use smallvec::SmallVec;
5354
use std::fmt::Debug;
5455
use std::io::{Error, ErrorKind};
56+
use std::sync::atomic::AtomicUsize;
5557

5658
// Re-export types from alloc module
5759
pub use alloc::{AreaIndex, LinearAddress, NodeAllocator};
@@ -125,6 +127,7 @@ impl<S: ReadableStorage> NodeStore<Committed, S> {
125127
deleted: Box::default(),
126128
root_hash: None,
127129
root: header.root_address().map(Into::into),
130+
unwritten_nodes: AtomicUsize::new(0),
128131
},
129132
storage,
130133
};
@@ -154,6 +157,7 @@ impl<S: ReadableStorage> NodeStore<Committed, S> {
154157
deleted: Box::default(),
155158
root_hash: None,
156159
root: None,
160+
unwritten_nodes: AtomicUsize::new(0),
157161
},
158162
})
159163
}
@@ -354,11 +358,27 @@ pub trait RootReader {
354358
}
355359

356360
/// A committed revision of a merkle trie.
357-
#[derive(Clone, Debug)]
361+
#[derive(Debug)]
358362
pub struct Committed {
359363
deleted: Box<[MaybePersistedNode]>,
360364
root_hash: Option<TrieHash>,
361365
root: Option<MaybePersistedNode>,
366+
/// TODO: No readers of this variable yet - will be used for tracking unwritten nodes in committed revisions
367+
unwritten_nodes: AtomicUsize,
368+
}
369+
370+
impl Clone for Committed {
371+
fn clone(&self) -> Self {
372+
Self {
373+
deleted: self.deleted.clone(),
374+
root_hash: self.root_hash.clone(),
375+
root: self.root.clone(),
376+
unwritten_nodes: AtomicUsize::new(
377+
self.unwritten_nodes
378+
.load(std::sync::atomic::Ordering::Relaxed),
379+
),
380+
}
381+
}
362382
}
363383

364384
#[derive(Clone, Debug)]
@@ -390,6 +410,8 @@ pub struct ImmutableProposal {
390410
root_hash: Option<TrieHash>,
391411
/// The root node, either in memory or on disk
392412
root: Option<MaybePersistedNode>,
413+
/// The number of unwritten nodes in this proposal
414+
unwritten_nodes: usize,
393415
}
394416

395417
impl ImmutableProposal {
@@ -407,6 +429,17 @@ impl ImmutableProposal {
407429
}
408430
}
409431

432+
impl Drop for ImmutableProposal {
433+
fn drop(&mut self) {
434+
// When an immutable proposal is dropped without being committed,
435+
// decrement the gauge to reflect that these nodes will never be written
436+
if self.unwritten_nodes > 0 {
437+
#[allow(clippy::cast_precision_loss)]
438+
gauge!("firewood.nodes.unwritten").decrement(self.unwritten_nodes as f64);
439+
}
440+
}
441+
}
442+
410443
/// Contains the state of a revision of a merkle trie.
411444
///
412445
/// The first generic parameter is the type of the revision, which supports reading nodes from parent proposals.
@@ -465,14 +498,23 @@ impl<T: Into<NodeStoreParent>, S: ReadableStorage> From<NodeStore<T, S>>
465498
/// Commit a proposal to a new revision of the trie
466499
impl<S: WritableStorage> From<NodeStore<ImmutableProposal, S>> for NodeStore<Committed, S> {
467500
fn from(val: NodeStore<ImmutableProposal, S>) -> Self {
501+
let NodeStore {
502+
header,
503+
kind,
504+
storage,
505+
} = val;
506+
// Use ManuallyDrop to prevent the Drop impl from running since we're committing
507+
let kind = std::mem::ManuallyDrop::new(kind);
508+
468509
NodeStore {
469-
header: val.header,
510+
header,
470511
kind: Committed {
471-
deleted: val.kind.deleted,
472-
root_hash: val.kind.root_hash,
473-
root: val.kind.root,
512+
deleted: kind.deleted.clone(),
513+
root_hash: kind.root_hash.clone(),
514+
root: kind.root.clone(),
515+
unwritten_nodes: AtomicUsize::new(kind.unwritten_nodes),
474516
},
475-
storage: val.storage,
517+
storage,
476518
}
477519
}
478520
}
@@ -499,6 +541,7 @@ impl NodeStore<Arc<ImmutableProposal>, FileBacked> {
499541
deleted: self.kind.deleted.clone(),
500542
root_hash: self.kind.root_hash.clone(),
501543
root: self.kind.root.clone(),
544+
unwritten_nodes: AtomicUsize::new(self.kind.unwritten_nodes),
502545
},
503546
storage: self.storage.clone(),
504547
}
@@ -524,6 +567,7 @@ impl<S: ReadableStorage> TryFrom<NodeStore<MutableProposal, S>>
524567
parent: Arc::new(ArcSwap::new(Arc::new(kind.parent))),
525568
root_hash: None,
526569
root: None,
570+
unwritten_nodes: 0,
527571
}),
528572
storage,
529573
};
@@ -536,20 +580,28 @@ impl<S: ReadableStorage> TryFrom<NodeStore<MutableProposal, S>>
536580

537581
// Hashes the trie and returns the address of the new root.
538582
#[cfg(feature = "ethhash")]
539-
let (root, root_hash) = nodestore.hash_helper(root, &mut Path::new(), None)?;
583+
let (root, root_hash, unwritten_count) =
584+
nodestore.hash_helper(root, &mut Path::new(), None)?;
540585
#[cfg(not(feature = "ethhash"))]
541-
let (root, root_hash) =
586+
let (root, root_hash, unwritten_count) =
542587
NodeStore::<MutableProposal, S>::hash_helper(root, &mut Path::new())?;
543588

544589
let immutable_proposal =
545590
Arc::into_inner(nodestore.kind).expect("no other references to the proposal");
591+
// Use ManuallyDrop to prevent Drop from running since we're replacing the proposal
592+
let immutable_proposal = std::mem::ManuallyDrop::new(immutable_proposal);
546593
nodestore.kind = Arc::new(ImmutableProposal {
547-
deleted: immutable_proposal.deleted,
548-
parent: immutable_proposal.parent,
594+
deleted: immutable_proposal.deleted.clone(),
595+
parent: immutable_proposal.parent.clone(),
549596
root_hash: Some(root_hash.into_triehash()),
550597
root: Some(root),
598+
unwritten_nodes: unwritten_count,
551599
});
552600

601+
// Track unwritten nodes in metrics
602+
#[allow(clippy::cast_precision_loss)]
603+
gauge!("firewood.nodes.unwritten").increment(unwritten_count as f64);
604+
553605
Ok(nodestore)
554606
}
555607
}

storage/src/nodestore/persist.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use std::iter::FusedIterator;
3131

3232
use crate::linear::FileIoError;
3333
use coarsetime::Instant;
34-
use metrics::counter;
34+
use metrics::{counter, gauge};
3535

3636
#[cfg(feature = "io-uring")]
3737
use crate::logger::trace;
@@ -229,6 +229,9 @@ impl NodeStore<Committed, FileBacked> {
229229
.write(persisted_address.get(), serialized.as_slice())?;
230230
node.persist_at(persisted_address);
231231

232+
// Decrement gauge immediately after node is written to storage
233+
gauge!("firewood.nodes.unwritten").decrement(1.0);
234+
232235
// Move the arc to a vector of persisted nodes for caching
233236
// we save them so we don't have to lock the cache while we write them
234237
// If we ever persist out of band, we might have a race condition, so
@@ -272,6 +275,11 @@ impl NodeStore<Committed, FileBacked> {
272275
// Finally persist the header
273276
self.flush_header()?;
274277

278+
// Reset unwritten nodes counter to zero since all nodes are now persisted
279+
self.kind
280+
.unwritten_nodes
281+
.store(0, std::sync::atomic::Ordering::Relaxed);
282+
275283
Ok(())
276284
}
277285

@@ -292,11 +300,13 @@ impl NodeStore<Committed, FileBacked> {
292300
}
293301

294302
/// Helper function to handle completion queue entries and check for errors
303+
/// Returns the number of completed operations
295304
fn handle_completion_queue(
296305
storage: &FileBacked,
297306
completion_queue: io_uring::cqueue::CompletionQueue<'_>,
298307
saved_pinned_buffers: &mut [PinnedBufferEntry],
299-
) -> Result<(), FileIoError> {
308+
) -> Result<usize, FileIoError> {
309+
let mut completed_count = 0;
300310
for entry in completion_queue {
301311
let item = entry.user_data() as usize;
302312
let pbe = saved_pinned_buffers
@@ -322,8 +332,9 @@ impl NodeStore<Committed, FileBacked> {
322332
));
323333
}
324334
pbe.offset = None;
335+
completed_count += 1;
325336
}
326-
Ok(())
337+
Ok(completed_count)
327338
}
328339

329340
const RINGSIZE: usize = FileBacked::RINGSIZE as usize;
@@ -404,11 +415,16 @@ impl NodeStore<Committed, FileBacked> {
404415
})?;
405416
let completion_queue = ring.completion();
406417
trace!("competion queue length: {}", completion_queue.len());
407-
handle_completion_queue(
418+
let completed_writes = handle_completion_queue(
408419
&self.storage,
409420
completion_queue,
410421
&mut saved_pinned_buffers,
411422
)?;
423+
424+
// Decrement gauge for writes that have actually completed
425+
if completed_writes > 0 {
426+
gauge!("firewood.nodes.unwritten").decrement(completed_writes as f64);
427+
}
412428
}
413429

414430
// Mark node as persisted and collect for cache
@@ -424,7 +440,13 @@ impl NodeStore<Committed, FileBacked> {
424440
.file_io_error(e, 0, Some("io-uring final submit_and_wait".to_string()))
425441
})?;
426442

427-
handle_completion_queue(&self.storage, ring.completion(), &mut saved_pinned_buffers)?;
443+
let final_completed_writes =
444+
handle_completion_queue(&self.storage, ring.completion(), &mut saved_pinned_buffers)?;
445+
446+
// Decrement gauge for final batch of writes that completed
447+
if final_completed_writes > 0 {
448+
gauge!("firewood.nodes.unwritten").decrement(final_completed_writes as f64);
449+
}
428450

429451
debug_assert!(
430452
!saved_pinned_buffers.iter().any(|pbe| pbe.offset.is_some()),

0 commit comments

Comments
 (0)