Skip to content

Commit

Permalink
feat: improved shard cache (#7429)
Browse files Browse the repository at this point in the history
Co-authored-by: firatNEAR <firat@near.org>
Co-authored-by: firatNEAR <102993450+firatNEAR@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 26, 2022
1 parent a65bc1a commit a8da6a0
Show file tree
Hide file tree
Showing 9 changed files with 523 additions and 44 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
* `network.external_address` field in `config.json` file is
deprecated. In fact it has never been used and only served to
confuse everyone [#7300](https://github.com/near/nearcore/pull/7300)
* Due to increasing state size, improved shard cache for Trie nodes to
put more nodes in memory. Requires 3 GB more RAM
[#7429](https://github.com/near/nearcore/pull/7429)

## 1.28.0 [2022-07-27]

Expand Down
8 changes: 8 additions & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2775,8 +2775,14 @@ impl<'a> ChainStoreUpdate<'a> {
block_hash,
)?;
}

// Convert trie changes to database ops for trie nodes.
// Create separate store update for deletions, because we want to update cache and don't want to remove nodes
// from the store.
let mut deletions_store_update = self.store().store_update();
for mut wrapped_trie_changes in self.trie_changes.drain(..) {
wrapped_trie_changes.insertions_into(&mut store_update);
wrapped_trie_changes.deletions_into(&mut deletions_store_update);
wrapped_trie_changes.state_changes_into(&mut store_update);

if self.chain_store.save_trie_changes {
Expand All @@ -2785,6 +2791,8 @@ impl<'a> ChainStoreUpdate<'a> {
.map_err(|err| Error::Other(err.to_string()))?;
}
}
deletions_store_update.update_cache()?;

for ((block_hash, shard_id), state_changes) in
self.add_state_changes_for_split_states.drain()
{
Expand Down
4 changes: 2 additions & 2 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct StoreConfig {
pub block_size: bytesize::ByteSize,

/// Trie cache capacities
/// Default value: ShardUId {version: 1, shard_id: 3} -> 2_000_000. TODO: clarify
/// Default value: ShardUId {version: 1, shard_id: 3} -> 45_000_000
/// We're still experimenting with this parameter and it seems decreasing its value can improve
/// the performance of the storage
pub trie_cache_capacities: Vec<(ShardUId, usize)>,
Expand Down Expand Up @@ -108,7 +108,7 @@ impl Default for StoreConfig {
// we use it since then.
block_size: bytesize::ByteSize::kib(16),

trie_cache_capacities: Default::default(),
trie_cache_capacities: vec![(ShardUId { version: 1, shard_id: 3 }, 45_000_000)],
}
}
}
Expand Down
19 changes: 12 additions & 7 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ impl StoreUpdate {
self.transaction.merge(other.transaction)
}

pub fn update_cache(&self) -> io::Result<()> {
if let Some(tries) = &self.shard_tries {
// Note: avoid comparing wide pointers here to work-around
// https://github.com/rust-lang/rust/issues/69757
let addr = |arc| Arc::as_ptr(arc) as *const u8;
assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),);
tries.update_cache(&self.transaction)?;
}
Ok(())
}

pub fn commit(self) -> io::Result<()> {
debug_assert!(
{
Expand All @@ -378,13 +389,7 @@ impl StoreUpdate {
"Transaction overwrites itself: {:?}",
self
);
if let Some(tries) = self.shard_tries {
// Note: avoid comparing wide pointers here to work-around
// https://github.com/rust-lang/rust/issues/69757
let addr = |arc| Arc::as_ptr(arc) as *const u8;
assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),);
tries.update_cache(&self.transaction)?;
}
self.update_cache()?;
let _span = tracing::trace_span!(target: "store", "commit").entered();
for op in &self.transaction.ops {
match op {
Expand Down
134 changes: 133 additions & 1 deletion core/store/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use near_metrics::{try_create_histogram_vec, HistogramVec};
use near_metrics::{
try_create_histogram_vec, try_create_int_counter_vec, try_create_int_gauge_vec, HistogramVec,
IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;

pub(crate) static DATABASE_OP_LATENCY_HIST: Lazy<HistogramVec> = Lazy::new(|| {
Expand All @@ -10,3 +13,132 @@ pub(crate) static DATABASE_OP_LATENCY_HIST: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static CHUNK_CACHE_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_chunk_cache_hits",
"Chunk cache hits",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static CHUNK_CACHE_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_chunk_cache_misses",
"Chunk cache misses",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_hits",
"Shard cache hits",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_misses",
"Shard cache misses",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_TOO_LARGE: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_too_large",
"Number of values to be inserted into shard cache is too large",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec("near_shard_cache_size", "Shard cache size", &["shard_id", "is_view"])
.unwrap()
});

pub static CHUNK_CACHE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec("near_chunk_cache_size", "Chunk cache size", &["shard_id", "is_view"])
.unwrap()
});

pub static SHARD_CACHE_CURRENT_TOTAL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_shard_cache_current_total_size",
"Shard cache current total size",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_POP_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_pop_hits",
"Shard cache pop hits",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_POP_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_pop_misses",
"Shard cache pop misses",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_POP_LRU: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_pop_lru",
"Shard cache LRU pops",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_GC_POP_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_gc_pop_misses",
"Shard cache gc pop misses",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_DELETIONS_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_shard_cache_deletions_size",
"Shard cache deletions size",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static APPLIED_TRIE_DELETIONS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_applied_trie_deletions",
"Trie deletions applied to store",
&["shard_id"],
)
.unwrap()
});
pub static APPLIED_TRIE_INSERTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_applied_trie_insertions",
"Trie insertions applied to store",
&["shard_id"],
)
.unwrap()
});
pub static REVERTED_TRIE_INSERTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_reverted_trie_insertions",
"Trie insertions reverted due to GC of forks",
&["shard_id"],
)
.unwrap()
});
44 changes: 32 additions & 12 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use near_primitives::state_record::is_delayed_receipt_key;
use crate::flat_state::FlatState;
use crate::trie::trie_storage::{TrieCache, TrieCachingStorage};
use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR};
use crate::{DBCol, DBOp, DBTransaction};
use crate::{metrics, DBCol, DBOp, DBTransaction};
use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate};

/// Responsible for creation of trie caches, stores necessary configuration for it.
Expand All @@ -42,20 +42,24 @@ impl TrieCacheFactory {
}

/// Create new cache for the given shard uid.
pub fn create_cache(&self, shard_uid: &ShardUId) -> TrieCache {
match self.capacities.get(shard_uid) {
Some(capacity) => TrieCache::with_capacity(*capacity),
None => TrieCache::new(),
pub fn create_cache(&self, shard_uid: &ShardUId, is_view: bool) -> TrieCache {
let capacity = if is_view { None } else { self.capacities.get(shard_uid) };
match capacity {
Some(capacity) => TrieCache::with_capacities(*capacity, shard_uid.shard_id, is_view),
None => TrieCache::new(shard_uid.shard_id, is_view),
}
}

/// Create caches on the initialization of storage structures.
pub fn create_initial_caches(&self) -> HashMap<ShardUId, TrieCache> {
pub fn create_initial_caches(&self, is_view: bool) -> HashMap<ShardUId, TrieCache> {
assert_ne!(self.num_shards, 0);
let shards: Vec<_> = (0..self.num_shards)
.map(|shard_id| ShardUId { version: self.shard_version, shard_id: shard_id as u32 })
.collect();
shards.iter().map(|&shard_uid| (shard_uid, self.create_cache(&shard_uid))).collect()
shards
.iter()
.map(|&shard_uid| (shard_uid, self.create_cache(&shard_uid, is_view)))
.collect()
}
}

Expand All @@ -73,8 +77,8 @@ pub struct ShardTries(Arc<ShardTriesInner>);

impl ShardTries {
pub fn new(store: Store, trie_cache_factory: TrieCacheFactory) -> Self {
let caches = trie_cache_factory.create_initial_caches();
let view_caches = trie_cache_factory.create_initial_caches();
let caches = trie_cache_factory.create_initial_caches(false);
let view_caches = trie_cache_factory.create_initial_caches(true);
ShardTries(Arc::new(ShardTriesInner {
store,
trie_cache_factory,
Expand Down Expand Up @@ -112,10 +116,11 @@ impl ShardTries {
let mut caches = caches_to_use.write().expect(POISONED_LOCK_ERR);
caches
.entry(shard_uid)
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid))
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid, is_view))
.clone()
};
let storage = Box::new(TrieCachingStorage::new(self.0.store.clone(), cache, shard_uid));
let storage =
Box::new(TrieCachingStorage::new(self.0.store.clone(), cache, shard_uid, is_view));
let flat_state = {
#[cfg(feature = "protocol_feature_flat_state")]
if use_flat_state {
Expand Down Expand Up @@ -177,7 +182,7 @@ impl ShardTries {
for (shard_uid, ops) in shards {
let cache = caches
.entry(shard_uid)
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid))
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid, false))
.clone();
cache.update_cache(ops);
}
Expand Down Expand Up @@ -238,6 +243,9 @@ impl ShardTries {
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
metrics::APPLIED_TRIE_INSERTIONS
.with_label_values(&[&format!("{}", shard_uid.shard_id)])
.inc_by(trie_changes.insertions.len() as u64);
self.apply_insertions_inner(&trie_changes.insertions, shard_uid, store_update)
}

Expand All @@ -247,6 +255,9 @@ impl ShardTries {
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
metrics::APPLIED_TRIE_DELETIONS
.with_label_values(&[&format!("{}", shard_uid.shard_id)])
.inc_by(trie_changes.deletions.len() as u64);
self.apply_deletions_inner(&trie_changes.deletions, shard_uid, store_update)
}

Expand All @@ -256,6 +267,9 @@ impl ShardTries {
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
metrics::REVERTED_TRIE_INSERTIONS
.with_label_values(&[&format!("{}", shard_uid.shard_id)])
.inc_by(trie_changes.insertions.len() as u64);
self.apply_deletions_inner(&trie_changes.insertions, shard_uid, store_update)
}

Expand Down Expand Up @@ -321,10 +335,16 @@ impl WrappedTrieChanges {
&self.state_changes
}

/// Save insertions of trie nodes into Store.
pub fn insertions_into(&self, store_update: &mut StoreUpdate) {
self.tries.apply_insertions(&self.trie_changes, self.shard_uid, store_update)
}

/// Save deletions of trie nodes into Store.
pub fn deletions_into(&self, store_update: &mut StoreUpdate) {
self.tries.apply_deletions(&self.trie_changes, self.shard_uid, store_update)
}

/// Save state changes into Store.
///
/// NOTE: the changes are drained from `self`.
Expand Down
Loading

0 comments on commit a8da6a0

Please sign in to comment.