Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: avoid expensive lookup of storage metrics #7495

Merged
merged 2 commits into from
Aug 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 73 additions & 33 deletions core/store/src/trie/trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::borrow::Borrow;
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::{Arc, Mutex};

use near_metrics::prometheus;
use near_metrics::prometheus::core::{GenericCounter, GenericGauge};
use near_primitives::hash::CryptoHash;

use crate::db::refcount::decode_value_with_rc;
Expand Down Expand Up @@ -73,6 +75,18 @@ pub struct TrieCacheInner {
shard_id: u32,
/// Whether cache is used for view calls execution.
is_view: bool,
// Counters tracking operations happening inside the shard cache.
// Stored here to avoid overhead of looking them up on hot paths.
metrics: TrieCacheMetrics,
}

struct TrieCacheMetrics {
shard_cache_too_large: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_pop_hits: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_pop_misses: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_pop_lru: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_gc_pop_misses: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_deletions_size: GenericGauge<prometheus::core::AtomicI64>,
}

impl TrieCacheInner {
Expand All @@ -84,13 +98,27 @@ impl TrieCacheInner {
is_view: bool,
) -> Self {
assert!(cache_capacity > 0 && total_size_limit > 0);
let metrics_labels: [&str; 2] = [&format!("{}", shard_id), &format!("{}", is_view as u8)];
let metrics = TrieCacheMetrics {
shard_cache_too_large: metrics::SHARD_CACHE_TOO_LARGE
.with_label_values(&metrics_labels),
shard_cache_pop_hits: metrics::SHARD_CACHE_POP_HITS.with_label_values(&metrics_labels),
shard_cache_pop_misses: metrics::SHARD_CACHE_POP_MISSES
.with_label_values(&metrics_labels),
shard_cache_pop_lru: metrics::SHARD_CACHE_POP_LRU.with_label_values(&metrics_labels),
shard_cache_gc_pop_misses: metrics::SHARD_CACHE_GC_POP_MISSES
.with_label_values(&metrics_labels),
shard_cache_deletions_size: metrics::SHARD_CACHE_DELETIONS_SIZE
.with_label_values(&metrics_labels),
};
Self {
cache: LruCache::new(cache_capacity),
deletions: BoundedQueue::new(deletions_queue_capacity),
total_size: 0,
total_size_limit,
shard_id,
is_view,
metrics,
}
}

Expand All @@ -105,26 +133,24 @@ impl TrieCacheInner {
}

pub(crate) fn put(&mut self, key: CryptoHash, value: Arc<[u8]>) {
let metrics_labels: [&str; 2] =
[&format!("{}", self.shard_id), &format!("{}", self.is_view as u8)];
while self.total_size > self.total_size_limit || self.cache.len() == self.cache.cap() {
// First, try to evict value using the key from deletions queue.
match self.deletions.pop() {
Some(key) => match self.cache.pop(&key) {
Some(value) => {
metrics::SHARD_CACHE_POP_HITS.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_pop_hits.inc();
self.total_size -= value.len() as u64;
continue;
}
None => {
metrics::SHARD_CACHE_POP_MISSES.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_pop_misses.inc();
}
},
None => {}
}

// Second, pop LRU value.
metrics::SHARD_CACHE_POP_LRU.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_pop_lru.inc();
let (_, value) =
self.cache.pop_lru().expect("Cannot fail because total size capacity is > 0");
self.total_size -= value.len() as u64;
Expand All @@ -144,30 +170,26 @@ impl TrieCacheInner {
// Adds key to the deletions queue if it is present in cache.
// Returns key-value pair which are popped if deletions queue is full.
pub(crate) fn pop(&mut self, key: &CryptoHash) -> Option<(CryptoHash, Arc<[u8]>)> {
let metrics_labels: [&str; 2] =
[&format!("{}", self.shard_id), &format!("{}", self.is_view as u8)];
metrics::SHARD_CACHE_DELETIONS_SIZE
.with_label_values(&metrics_labels)
.set(self.deletions.len() as i64);
self.metrics.shard_cache_deletions_size.set(self.deletions.len() as i64);
// Do nothing if key was removed before.
if self.cache.contains(key) {
// Put key to the queue of deletions and possibly remove another key we have to delete.
match self.deletions.put(key.clone()) {
Some(key_to_delete) => match self.cache.pop(&key_to_delete) {
Some(evicted_value) => {
metrics::SHARD_CACHE_POP_HITS.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_pop_hits.inc();
self.total_size -= evicted_value.len() as u64;
Some((key_to_delete, evicted_value))
}
None => {
metrics::SHARD_CACHE_POP_MISSES.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_pop_misses.inc();
None
}
},
None => None,
}
} else {
metrics::SHARD_CACHE_GC_POP_MISSES.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_gc_pop_misses.inc();
None
}
}
Expand Down Expand Up @@ -210,15 +232,13 @@ impl TrieCache {

pub fn update_cache(&self, ops: Vec<(CryptoHash, Option<&Vec<u8>>)>) {
let mut guard = self.0.lock().expect(POISONED_LOCK_ERR);
let metrics_labels: [&str; 2] =
[&format!("{}", guard.shard_id), &format!("{}", guard.is_view as u8)];
for (hash, opt_value_rc) in ops {
if let Some(value_rc) = opt_value_rc {
if let (Some(value), _rc) = decode_value_with_rc(&value_rc) {
if value.len() < TRIE_LIMIT_CACHED_VALUE_SIZE {
guard.put(hash, value.into());
} else {
metrics::SHARD_CACHE_TOO_LARGE.with_label_values(&metrics_labels).inc();
guard.metrics.shard_cache_too_large.inc();
}
} else {
guard.pop(&hash);
Expand Down Expand Up @@ -350,7 +370,6 @@ pub(crate) const TRIE_LIMIT_CACHED_VALUE_SIZE: usize = 1000;
pub struct TrieCachingStorage {
pub(crate) store: Store,
pub(crate) shard_uid: ShardUId,
is_view: bool,

/// Caches ever requested items for the shard `shard_uid`. Used to speed up DB operations, presence of any item is
/// not guaranteed.
Expand All @@ -369,6 +388,20 @@ pub struct TrieCachingStorage {
pub(crate) db_read_nodes: Cell<u64>,
/// Counts trie nodes retrieved from the chunk cache.
pub(crate) mem_read_nodes: Cell<u64>,
// Counters tracking operations happening inside the shard cache.
// Stored here to avoid overhead of looking them up on hot paths.
metrics: TrieCacheInnerMetrics,
}

struct TrieCacheInnerMetrics {
chunk_cache_hits: GenericCounter<prometheus::core::AtomicU64>,
chunk_cache_misses: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_hits: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_misses: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_too_large: GenericCounter<prometheus::core::AtomicU64>,
shard_cache_size: GenericGauge<prometheus::core::AtomicI64>,
chunk_cache_size: GenericGauge<prometheus::core::AtomicI64>,
shard_cache_current_total_size: GenericGauge<prometheus::core::AtomicI64>,
}

impl TrieCachingStorage {
Expand All @@ -378,15 +411,29 @@ impl TrieCachingStorage {
shard_uid: ShardUId,
is_view: bool,
) -> TrieCachingStorage {
let metrics_labels: [&str; 2] =
[&format!("{}", shard_uid.shard_id), &format!("{}", is_view as u8)];
let metrics = TrieCacheInnerMetrics {
chunk_cache_hits: metrics::CHUNK_CACHE_HITS.with_label_values(&metrics_labels),
chunk_cache_misses: metrics::CHUNK_CACHE_MISSES.with_label_values(&metrics_labels),
shard_cache_hits: metrics::SHARD_CACHE_HITS.with_label_values(&metrics_labels),
shard_cache_misses: metrics::SHARD_CACHE_MISSES.with_label_values(&metrics_labels),
shard_cache_too_large: metrics::SHARD_CACHE_TOO_LARGE
.with_label_values(&metrics_labels),
shard_cache_size: metrics::SHARD_CACHE_SIZE.with_label_values(&metrics_labels),
chunk_cache_size: metrics::CHUNK_CACHE_SIZE.with_label_values(&metrics_labels),
shard_cache_current_total_size: metrics::SHARD_CACHE_CURRENT_TOTAL_SIZE
.with_label_values(&metrics_labels),
};
TrieCachingStorage {
store,
shard_uid,
is_view,
shard_cache,
cache_mode: Cell::new(TrieCacheMode::CachingShard),
chunk_cache: RefCell::new(Default::default()),
db_read_nodes: Cell::new(0),
mem_read_nodes: Cell::new(0),
metrics,
}
}

Expand Down Expand Up @@ -427,35 +474,28 @@ impl TrieCachingStorage {

impl TrieStorage for TrieCachingStorage {
fn retrieve_raw_bytes(&self, hash: &CryptoHash) -> Result<Arc<[u8]>, StorageError> {
let metrics_labels: [&str; 2] =
[&format!("{}", self.shard_uid.shard_id), &format!("{}", self.is_view as u8)];

metrics::CHUNK_CACHE_SIZE
.with_label_values(&metrics_labels)
.set(self.chunk_cache.borrow().len() as i64);
self.metrics.chunk_cache_size.set(self.chunk_cache.borrow().len() as i64);
// Try to get value from chunk cache containing nodes with cheaper access. We can do it for any `TrieCacheMode`,
// because we charge for reading nodes only when `CachingChunk` mode is enabled anyway.
if let Some(val) = self.chunk_cache.borrow_mut().get(hash) {
metrics::CHUNK_CACHE_HITS.with_label_values(&metrics_labels).inc();
self.metrics.chunk_cache_hits.inc();
self.inc_mem_read_nodes();
return Ok(val.clone());
}
metrics::CHUNK_CACHE_MISSES.with_label_values(&metrics_labels).inc();
self.metrics.chunk_cache_misses.inc();

// Try to get value from shard cache containing most recently touched nodes.
let mut guard = self.shard_cache.0.lock().expect(POISONED_LOCK_ERR);
metrics::SHARD_CACHE_SIZE.with_label_values(&metrics_labels).set(guard.len() as i64);
metrics::SHARD_CACHE_CURRENT_TOTAL_SIZE
.with_label_values(&metrics_labels)
.set(guard.current_total_size() as i64);
self.metrics.shard_cache_size.set(guard.len() as i64);
self.metrics.shard_cache_current_total_size.set(guard.current_total_size() as i64);
let val = match guard.get(hash) {
Some(val) => {
metrics::SHARD_CACHE_HITS.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_hits.inc();
near_o11y::io_trace!(count: "shard_cache_hit");
val.clone()
}
None => {
metrics::SHARD_CACHE_MISSES.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_misses.inc();
near_o11y::io_trace!(count: "shard_cache_miss");
// If value is not present in cache, get it from the storage.
let key = Self::get_key_from_shard_uid_and_hash(self.shard_uid, hash);
Expand All @@ -475,7 +515,7 @@ impl TrieStorage for TrieCachingStorage {
if val.len() < TRIE_LIMIT_CACHED_VALUE_SIZE {
guard.put(*hash, val.clone());
} else {
metrics::SHARD_CACHE_TOO_LARGE.with_label_values(&metrics_labels).inc();
self.metrics.shard_cache_too_large.inc();
near_o11y::io_trace!(count: "shard_cache_too_large");
}

Expand Down