Skip to content

Commit

Permalink
fix(snapshots): drop unwanted column families instead of deleting all…
Browse files Browse the repository at this point in the history
… keys (#10803)

If the `columns_to_keep` arg of
`checkpoint_hot_storage_and_cleanup_columns()` is `Some`, then we delete
all the data in every other column. Then if snapshot compaction is
enabled in the configs, we rely on that to clean up the files on disk.
Instead of doing that, we can just call `drop_cf()` on every unwanted
column family, and the associated sst files will be removed without the
need for any compactions.

So this moves the `columns_to_keep` arg to
`near_store::db::Database::create_checkpoint()`, and has the rocksdb
implementation of that trait call `drop_cf()` on unwanted column
families. These column families are then immediately recreated in
`checkpoint_hot_storage_and_cleanup_columns()` by the call to
`StoreOpener::open_in_mode()`, but the data on disk is gone.

This also means we can get rid of the state snapshot options in the
config, since they were only ever intended to clean up the unwanted
files, which aren't there anymore.
  • Loading branch information
marcelo-gonzalez authored Mar 15, 2024
1 parent 9a6763b commit 61c67c6
Show file tree
Hide file tree
Showing 18 changed files with 130 additions and 134 deletions.
2 changes: 0 additions & 2 deletions chain/chain/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ impl NightshadeRuntime {
home_dir: home_dir.to_path_buf(),
hot_store_path: PathBuf::from("data"),
state_snapshot_subdir: PathBuf::from("state_snapshot"),
compaction_enabled: false,
},
)
}
Expand All @@ -177,7 +176,6 @@ impl NightshadeRuntime {
home_dir: home_dir.to_path_buf(),
hot_store_path: PathBuf::from("data"),
state_snapshot_subdir: PathBuf::from("state_snapshot"),
compaction_enabled: false,
},
)
}
Expand Down
1 change: 0 additions & 1 deletion chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ impl TestEnv {
home_dir: PathBuf::from(dir.path()),
hot_store_path: PathBuf::from("data"),
state_snapshot_subdir: PathBuf::from("state_snapshot"),
compaction_enabled: false,
},
);
let state_roots = get_genesis_state_roots(&store).unwrap().unwrap();
Expand Down
38 changes: 7 additions & 31 deletions chain/chain/src/state_snapshot_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use std::sync::Arc;

/// Runs tasks related to state snapshots.
/// There are three main handlers in StateSnapshotActor and they are called in sequence
/// 1. DeleteSnapshotRequest: deletes a snapshot and optionally calls CreateSnapshotRequest.
/// 2. CreateSnapshotRequest: creates a new snapshot and optionally calls CompactSnapshotRequest based on config.
/// 3. CompactSnapshotRequest: compacts a snapshot store.
/// 1. [`DeleteAndMaybeCreateSnapshotRequest`]: deletes a snapshot and optionally calls CreateSnapshotRequest.
/// 2. [`CreateSnapshotRequest`]: creates a new snapshot.
pub struct StateSnapshotActor {
flat_storage_manager: FlatStorageManager,
network_adapter: PeerManagerAdapter,
Expand Down Expand Up @@ -56,10 +55,6 @@ struct CreateSnapshotRequest {
block: Block,
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
struct CompactSnapshotRequest {}

impl actix::Handler<WithSpanContext<DeleteAndMaybeCreateSnapshotRequest>> for StateSnapshotActor {
type Result = ();

Expand Down Expand Up @@ -87,7 +82,11 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
type Result = ();

#[perf]
fn handle(&mut self, msg: WithSpanContext<CreateSnapshotRequest>, context: &mut Context<Self>) {
fn handle(
&mut self,
msg: WithSpanContext<CreateSnapshotRequest>,
_context: &mut Context<Self>,
) {
let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg);
tracing::debug!(target: "state_snapshot", ?msg);

Expand All @@ -113,12 +112,6 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
},
));
}

if self.tries.state_snapshot_config().compaction_enabled {
context.address().do_send(CompactSnapshotRequest {}.with_span_context());
} else {
tracing::info!(target: "state_snapshot", "State snapshot ready, not running compaction.");
}
}
Err(err) => {
tracing::error!(target: "state_snapshot", ?err, "State snapshot creation failed")
Expand All @@ -127,23 +120,6 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
}
}

/// Runs compaction of the snapshot store.
impl actix::Handler<WithSpanContext<CompactSnapshotRequest>> for StateSnapshotActor {
type Result = ();

#[perf]
fn handle(&mut self, msg: WithSpanContext<CompactSnapshotRequest>, _: &mut Context<Self>) {
let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg);
tracing::debug!(target: "state_snapshot", ?msg);

if let Err(err) = self.tries.compact_state_snapshot() {
tracing::error!(target: "state_snapshot", ?err, "State snapshot compaction failed");
} else {
tracing::info!(target: "state_snapshot", "State snapshot compaction succeeded");
}
}
}

type MakeSnapshotCallback =
Arc<dyn Fn(CryptoHash, EpochHeight, Vec<ShardUId>, Block) -> () + Send + Sync + 'static>;

Expand Down
11 changes: 0 additions & 11 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,13 @@ pub struct StoreConfig {

// TODO (#9989): To be phased out in favor of state_snapshot_config
pub state_snapshot_enabled: bool,

// TODO (#9989): To be phased out in favor of state_snapshot_config
pub state_snapshot_compaction_enabled: bool,
}

/// Config used to control state snapshot creation. This is used for state sync and resharding.
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct StateSnapshotConfig {
pub state_snapshot_type: StateSnapshotType,
/// State Snapshot compaction usually is a good thing but is heavy on IO and can take considerable
/// amount of time.
/// It makes state snapshots tiny (10GB) over the course of an epoch.
/// We may want to disable it for archival nodes during resharding
pub compaction_enabled: bool,
}

#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -274,9 +266,6 @@ impl Default for StoreConfig {

// TODO: To be phased out in favor of state_snapshot_config
state_snapshot_enabled: false,

// TODO: To be phased out in favor of state_snapshot_config
state_snapshot_compaction_enabled: false,
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,11 @@ pub trait Database: Sync + Send {
fn get_store_statistics(&self) -> Option<StoreStatistics>;

/// Create checkpoint in provided path
fn create_checkpoint(&self, path: &std::path::Path) -> anyhow::Result<()>;
fn create_checkpoint(
&self,
path: &std::path::Path,
columns_to_keep: Option<&[DBCol]>,
) -> anyhow::Result<()>;
}

fn assert_no_overwrite(col: DBCol, key: &[u8], value: &[u8], old_value: &[u8]) {
Expand Down
8 changes: 6 additions & 2 deletions core/store/src/db/colddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ impl Database for ColdDB {
self.cold.get_store_statistics()
}

fn create_checkpoint(&self, path: &std::path::Path) -> anyhow::Result<()> {
self.cold.create_checkpoint(path)
fn create_checkpoint(
&self,
path: &std::path::Path,
columns_to_keep: Option<&[DBCol]>,
) -> anyhow::Result<()> {
self.cold.create_checkpoint(path, columns_to_keep)
}
}

Expand Down
86 changes: 67 additions & 19 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::{metadata, metrics, DBCol, StoreConfig, StoreStatistics, Temperature}
use ::rocksdb::{
BlockBasedOptions, Cache, ColumnFamily, Env, IteratorMode, Options, ReadOptions, WriteBatch, DB,
};
use anyhow::Context;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::io;
use std::ops::Deref;
Expand Down Expand Up @@ -86,7 +88,7 @@ impl RocksDB {
mode: Mode,
temp: Temperature,
) -> io::Result<Self> {
let columns: Vec<DBCol> = DBCol::iter().collect();
let columns = DBCol::iter().collect_vec();
Self::open_with_columns(path, store_config, mode, temp, &columns)
}

Expand Down Expand Up @@ -129,20 +131,11 @@ impl RocksDB {
columns: &[DBCol],
) -> io::Result<(DB, Options)> {
let options = rocksdb_options(store_config, mode);
let cf_descriptors = columns
.iter()
.copied()
.map(|col| {
rocksdb::ColumnFamilyDescriptor::new(
col_name(col),
rocksdb_column_options(col, store_config, temp),
)
})
.collect::<Vec<_>>();
let cfs = cf_descriptors(columns, store_config, temp);
let db = if mode.read_only() {
DB::open_cf_descriptors_read_only(&options, path, cf_descriptors, false)
DB::open_cf_descriptors_read_only(&options, path, cfs, false)
} else {
DB::open_cf_descriptors(&options, path, cf_descriptors)
DB::open_cf_descriptors(&options, path, cfs)
}
.map_err(io::Error::other)?;
if cfg!(feature = "single_thread_rocksdb") {
Expand Down Expand Up @@ -415,24 +408,72 @@ impl Database for RocksDB {
}
}

fn create_checkpoint(&self, path: &std::path::Path) -> anyhow::Result<()> {
fn create_checkpoint(
&self,
path: &std::path::Path,
columns_to_keep: Option<&[DBCol]>,
) -> anyhow::Result<()> {
let _span =
tracing::info_span!(target: "state_snapshot", "create_checkpoint", ?path).entered();
let cp = ::rocksdb::checkpoint::Checkpoint::new(&self.db)?;
cp.create_checkpoint(path)?;
cp.create_checkpoint(path)
.with_context(|| format!("failed to create checkpoint at {}", path.display()))?;

let Some(columns_to_keep) = columns_to_keep else {
return Ok(());
};
let opts = common_rocksdb_options();
let cfs =
cf_descriptors(&DBCol::iter().collect_vec(), &StoreConfig::default(), Temperature::Hot);
let mut db = DB::open_cf_descriptors(&opts, path, cfs)
.with_context(|| format!("failed to open checkpoint at {}", path.display()))?;
for col in DBCol::iter() {
if !columns_to_keep.contains(&col) {
if col == DBCol::DbVersion {
// We need to keep DbVersion because it's expected to be there when
// we check the metadata in DBOpener::get_metadata()
tracing::debug!(
target: "store",
"create_checkpoint called with columns to keep not including DBCol::DbVersion. Including it anyway."
);
continue;
}
db.drop_cf(col_name(col)).with_context(|| {
format!(
"failed to drop column family {:?} from checkpoint at {}",
col,
path.display()
)
})?;
}
}
Ok(())
}
}

fn cf_descriptors(
columns: &[DBCol],
store_config: &StoreConfig,
temp: Temperature,
) -> Vec<rocksdb::ColumnFamilyDescriptor> {
columns
.iter()
.copied()
.map(|col| {
rocksdb::ColumnFamilyDescriptor::new(
col_name(col),
rocksdb_column_options(col, store_config, temp),
)
})
.collect::<Vec<_>>()
}

/// DB level options
fn rocksdb_options(store_config: &StoreConfig, mode: Mode) -> Options {
fn common_rocksdb_options() -> Options {
let mut opts = Options::default();

set_compression_options(&mut opts);
opts.create_missing_column_families(mode.read_write());
opts.create_if_missing(mode.can_create());
opts.set_use_fsync(false);
opts.set_max_open_files(store_config.max_open_files.try_into().unwrap_or(i32::MAX));
opts.set_keep_log_file_num(1);
opts.set_bytes_per_sync(bytesize::MIB);
opts.set_write_buffer_size(256 * bytesize::MIB as usize);
Expand All @@ -450,7 +491,14 @@ fn rocksdb_options(store_config: &StoreConfig, mode: Mode) -> Options {
opts.increase_parallelism(std::cmp::max(1, num_cpus::get() as i32 / 2));
opts.set_max_total_wal_size(bytesize::GIB);
}
opts
}

fn rocksdb_options(store_config: &StoreConfig, mode: Mode) -> Options {
let mut opts = common_rocksdb_options();
opts.create_missing_column_families(mode.read_write());
opts.create_if_missing(mode.can_create());
opts.set_max_open_files(store_config.max_open_files.try_into().unwrap_or(i32::MAX));
// TODO(mina86): Perhaps enable statistics even in read-only mode?
if mode.read_write() && store_config.enable_statistics {
// Rust API doesn't permit choosing stats level. The default stats level
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/db/splitdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ impl Database for SplitDB {
None
}

fn create_checkpoint(&self, _path: &std::path::Path) -> anyhow::Result<()> {
fn create_checkpoint(
&self,
_path: &std::path::Path,
_columns_to_keep: Option<&[DBCol]>,
) -> anyhow::Result<()> {
log_assert_fail!("create_checkpoint is not allowed - the split storage has two stores");
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion core/store/src/db/testdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ impl Database for TestDB {
self.stats.read().unwrap().clone()
}

fn create_checkpoint(&self, _path: &std::path::Path) -> anyhow::Result<()> {
fn create_checkpoint(
&self,
_path: &std::path::Path,
_columns_to_keep: Option<&[DBCol]>,
) -> anyhow::Result<()> {
Ok(())
}
}
1 change: 1 addition & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use crate::trie::{
PartialStorage, PrefetchApi, PrefetchError, RawTrieNode, RawTrieNodeWithSize, ShardTries,
StateSnapshot, StateSnapshotConfig, Trie, TrieAccess, TrieCache, TrieCachingStorage,
TrieChanges, TrieConfig, TrieDBStorage, TrieStorage, WrappedTrieChanges,
STATE_SNAPSHOT_COLUMNS,
};

pub mod cold_storage;
Expand Down
9 changes: 0 additions & 9 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,6 @@ pub(crate) static DELETE_STATE_SNAPSHOT_ELAPSED: Lazy<Histogram> = Lazy::new(||
.unwrap()
});

pub(crate) static COMPACT_STATE_SNAPSHOT_ELAPSED: Lazy<Histogram> = Lazy::new(|| {
try_create_histogram_with_buckets(
"near_compact_state_snapshot_elapsed_sec",
"Latency of compaction of a state snapshot, in seconds",
exponential_buckets(0.001, 1.6, 40).unwrap(),
)
.unwrap()
});

pub(crate) static MOVE_STATE_SNAPSHOT_FLAT_HEAD_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_move_state_snapshot_flat_head_elapsed_sec",
Expand Down
Loading

0 comments on commit 61c67c6

Please sign in to comment.