From 1f97b2365f8e58a86f70e175895e402d04d71aaf Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 28 May 2021 16:42:56 +0900 Subject: [PATCH] Avoid full-range compactions with periodic filtered b.g. ones (#16697) * Update rocksdb to v0.16.0 * Promote the infrequent and important log to info! * Force background compaction by ttl without manual compaction * Fix test * Support no compaction mode in test_ledger_cleanup_compaction * Fix comment * Make compaction_interval customizable * Avoid major compaction with periodic filtering... * Adress lazy_static, special cfs and range check * Clean up a bit and add comment * Add comment * More comments... * Config code cleanup * Add comment * Use .conflicts_with() * Nullify unneeded delete_range ops for special CFs * Some clean ups * Clarify the locking intention * Ensure special CFs' consistency with PurgeType::CompactionFilter * Fix comment * Fix bad copy paste * Fix various types... * Don't use tuples * Add a unit test for compaction_filter * Fix typo... * Remove flag and just use new behavior always * Fix wrong condition negation... * Doc. about no set_last_purged_slot in purge_slots * Write a test and fix off-by-one bug.... * Apply suggestions from code review Co-authored-by: Tyera Eulberg * Follow up to github review suggestions * Fix line-wrapping * Fix conflict Co-authored-by: Tyera Eulberg --- Cargo.lock | 86 ++--- core/src/ledger_cleanup_service.rs | 16 +- core/src/validator.rs | 2 + core/tests/ledger_cleanup.rs | 20 +- ledger/Cargo.toml | 2 +- ledger/src/blockstore.rs | 239 +++++++++++++- ledger/src/blockstore/blockstore_purge.rs | 38 ++- ledger/src/blockstore_db.rs | 362 +++++++++++++++++++--- validator/src/main.rs | 4 +- 9 files changed, 638 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f044ae5febb3d..5189d2266ab193 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,26 +261,21 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.54.0" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66c0bb6167449588ff70803f4127f0684f9063097eca5016f37eb52b92c2cf36" +checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d" dependencies = [ "bitflags", "cexpr", - "cfg-if 0.1.10", "clang-sys", - "clap", - "env_logger 0.7.1", "lazy_static", "lazycell", - "log 0.4.11", "peeking_take_while", "proc-macro2 1.0.24", "quote 1.0.6", "regex", "rustc-hash", "shlex", - "which", ] [[package]] @@ -583,13 +578,13 @@ dependencies = [ [[package]] name = "clang-sys" -version = "0.29.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a" +checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c" dependencies = [ "glob", "libc", - "libloading 0.5.2", + "libloading 0.7.0", ] [[package]] @@ -1198,19 +1193,6 @@ dependencies = [ "syn 1.0.60", ] -[[package]] -name = "env_logger" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" -dependencies = [ - "atty", - "humantime 1.3.0", - "log 0.4.11", - "regex", - "termcolor", -] - [[package]] name = "env_logger" version = "0.8.3" @@ -1218,7 +1200,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" dependencies = [ "atty", - "humantime 2.0.1", + "humantime", "log 0.4.11", "regex", "termcolor", @@ -1778,15 +1760,6 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" -[[package]] -name = "humantime" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" -dependencies = [ - "quick-error", -] - [[package]] name = "humantime" version = "2.0.1" @@ -2186,28 +2159,28 @@ checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "libloading" -version = "0.5.2" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" +checksum = "2cadb8e769f070c45df05c78c7520eb4cd17061d4ab262e43cfc68b4d00ac71c" dependencies = [ - "cc", "winapi 0.3.8", ] [[package]] name = "libloading" -version = "0.6.2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cadb8e769f070c45df05c78c7520eb4cd17061d4ab262e43cfc68b4d00ac71c" +checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a" dependencies = [ + "cfg-if 1.0.0", "winapi 0.3.8", ] [[package]] name = "librocksdb-sys" -version = "6.11.4" +version = "6.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb5b56f651c204634b936be2f92dbb42c36867e00ff7fe2405591f3b9fa66f09" +checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9" dependencies = [ "bindgen", "cc", @@ -3059,12 +3032,6 @@ dependencies = [ "percent-encoding 2.1.0", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quote" version = "0.6.13" @@ -3461,9 +3428,9 @@ dependencies = [ [[package]] name = "rocksdb" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d83c02c429044d58474eaf5ae31e062d0de894e21125b47437ec0edc1397e6" +checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3" dependencies = [ "libc", "librocksdb-sys", @@ -4180,7 +4147,7 @@ dependencies = [ "criterion-stats", "ctrlc", "dirs-next", - "humantime 2.0.1", + "humantime", "indicatif", "log 0.4.11", "num-traits", @@ -4234,7 +4201,7 @@ dependencies = [ "base64 0.13.0", "chrono", "console 0.14.1", - "humantime 2.0.1", + "humantime", "indicatif", "serde", "serde_derive", @@ -4811,7 +4778,7 @@ version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fa7bddd7b89c26c6e3ef4af9b47d6bc8d60888559affb5160f5ade18c0cd058" dependencies = [ - "env_logger 0.8.3", + "env_logger", "lazy_static", "log 0.4.11", ] @@ -4820,7 +4787,7 @@ dependencies = [ name = "solana-logger" version = "1.8.0" dependencies = [ - "env_logger 0.8.3", + "env_logger", "lazy_static", "log 0.4.11", ] @@ -4861,7 +4828,7 @@ dependencies = [ name = "solana-metrics" version = "1.8.0" dependencies = [ - "env_logger 0.8.3", + "env_logger", "gethostname", "lazy_static", "log 0.4.11", @@ -5555,7 +5522,7 @@ name = "solana-watchtower" version = "1.8.0" dependencies = [ "clap", - "humantime 2.0.1", + "humantime", "log 0.4.11", "solana-clap-utils", "solana-cli-config", @@ -5803,7 +5770,7 @@ dependencies = [ "anyhow", "fnv", "futures 0.3.8", - "humantime 2.0.1", + "humantime", "log 0.4.11", "pin-project 1.0.1", "rand 0.7.3", @@ -6849,15 +6816,6 @@ dependencies = [ "tokio-tls", ] -[[package]] -name = "which" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724" -dependencies = [ - "libc", -] - [[package]] name = "winapi" version = "0.2.8" diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 9d081b277f58f6..8b1a1fc203638c 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -207,11 +207,25 @@ impl LedgerCleanupService { ); let mut purge_time = Measure::start("purge_slots"); + blockstore.purge_slots( purge_first_slot, lowest_cleanup_slot, - PurgeType::PrimaryIndex, + PurgeType::CompactionFilter, ); + // Update only after purge operation. + // Safety: This value can be used by compaction_filters shared via Arc. + // Compactions are async and run as a multi-threaded background job. However, this + // shouldn't cause consistency issues for iterators and getters because we have + // already expired all affected keys (older than or equal to lowest_cleanup_slot) + // by the above `purge_slots`. According to the general RocksDB design where SST + // files are immutable, even running iterators aren't affected; the database grabs + // a snapshot of the live set of sst files at iterator's creation. + // Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for + // transaction_status and address_signatures CFs. These are fine because they + // don't require strong consistent view for their operation. + blockstore.set_max_expired_slot(lowest_cleanup_slot); + purge_time.stop(); info!("{}", purge_time); diff --git a/core/src/validator.rs b/core/src/validator.rs index df80c95d1929d9..7b729fa949bace 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1636,9 +1636,11 @@ mod tests { } drop(blockstore); + // this purges and compacts all slots greater than or equal to 5 backup_and_clear_blockstore(&blockstore_path, 5, 2); let blockstore = Blockstore::open(&blockstore_path).unwrap(); + // assert that slots less than 5 aren't affected assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty()); for i in 5..10 { assert!(blockstore diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index 5e6258415ff187..69778edefa6152 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -39,6 +39,8 @@ mod tests { pub cleanup_blockstore: bool, pub emit_cpu_info: bool, pub assert_compaction: bool, + pub compaction_interval: Option, + pub no_compaction: bool, } #[derive(Clone, Copy, Debug)] @@ -154,6 +156,11 @@ mod tests { let emit_cpu_info = read_env("EMIT_CPU_INFO", true); // set default to `true` once compaction is merged let assert_compaction = read_env("ASSERT_COMPACTION", false); + let compaction_interval = match read_env("COMPACTION_INTERVAL", 0) { + maybe_zero if maybe_zero == 0 => None, + non_zero => Some(non_zero), + }; + let no_compaction = read_env("NO_COMPACTION", false); BenchmarkConfig { benchmark_slots, @@ -166,6 +173,8 @@ mod tests { cleanup_blockstore, emit_cpu_info, assert_compaction, + compaction_interval, + no_compaction, } } @@ -211,8 +220,13 @@ mod tests { fn test_ledger_cleanup_compaction() { solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); + let mut blockstore = Blockstore::open(&blockstore_path).unwrap(); let config = get_benchmark_config(); + if config.no_compaction { + blockstore.set_no_compaction(true); + } + let blockstore = Arc::new(blockstore); + eprintln!("BENCHMARK CONFIG: {:?}", config); eprintln!("LEDGER_PATH: {:?}", &blockstore_path); @@ -223,6 +237,8 @@ mod tests { let stop_size_bytes = config.stop_size_bytes; let stop_size_iterations = config.stop_size_iterations; let pre_generate_data = config.pre_generate_data; + let compaction_interval = config.compaction_interval; + let batches = benchmark_slots / batch_size; let (sender, receiver) = channel(); @@ -232,7 +248,7 @@ mod tests { blockstore.clone(), max_ledger_shreds, &exit, - None, + compaction_interval, None, ); diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index c2a80b919010ad..15fc4a6c9c84a6 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -59,7 +59,7 @@ trees = "0.2.1" [dependencies.rocksdb] # Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts # when also using the bzip2 crate -version = "0.15.0" +version = "0.16.0" default-features = false features = ["lz4"] diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 420aad696719c3..d64d7e81d3a3ed 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -54,7 +54,7 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, - Arc, Mutex, RwLock, + Arc, Mutex, RwLock, RwLockWriteGuard, }, }; use thiserror::Error; @@ -92,6 +92,7 @@ type CompletedRanges = Vec<(u32, u32)>; pub enum PurgeType { Exact, PrimaryIndex, + CompactionFilter, } #[derive(Error, Debug)] @@ -144,7 +145,7 @@ pub struct Blockstore { insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, pub completed_slots_senders: Vec>>, - pub lowest_cleanup_slot: Arc>, + pub lowest_cleanup_slot: Arc>, no_compaction: bool, } @@ -1956,18 +1957,24 @@ impl Blockstore { batch.put::(0, &index0)?; Ok(None) } else { - let result = if index0.frozen && to_slot > index0.max_slot { - debug!("Pruning transaction index 0 at slot {}", index0.max_slot); + let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot { + info!( + "Pruning expired primary index 0 up to slot {} (max requested: {})", + index0.max_slot, to_slot + ); Some(0) } else if index1.frozen && to_slot > index1.max_slot { - debug!("Pruning transaction index 1 at slot {}", index1.max_slot); + info!( + "Pruning expired primary index 1 up to slot {} (max requested: {})", + index1.max_slot, to_slot + ); Some(1) } else { None }; - if result.is_some() { - *w_active_transaction_status_index = if index0.frozen { 0 } else { 1 }; + if let Some(purge_target_primary_index) = purge_target_primary_index { + *w_active_transaction_status_index = purge_target_primary_index; if index0.frozen { index0.max_slot = 0 }; @@ -1980,16 +1987,17 @@ impl Blockstore { batch.put::(1, &index1)?; } - Ok(result) + Ok(purge_target_primary_index) } } - fn get_primary_index( + fn get_primary_index_to_write( &self, slot: Slot, - w_active_transaction_status_index: &mut u64, + // take WriteGuard to require critical section semantics at call site + w_active_transaction_status_index: &RwLockWriteGuard, ) -> Result { - let i = *w_active_transaction_status_index; + let i = **w_active_transaction_status_index; let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap(); if slot > index_meta.max_slot { assert!(!index_meta.frozen); @@ -2028,9 +2036,10 @@ impl Blockstore { let status = status.into(); // This write lock prevents interleaving issues with the transaction_status_index_cf by gating // writes to that column - let mut w_active_transaction_status_index = + let w_active_transaction_status_index = self.active_transaction_status_index.write().unwrap(); - let primary_index = self.get_primary_index(slot, &mut w_active_transaction_status_index)?; + let primary_index = + self.get_primary_index_to_write(slot, &w_active_transaction_status_index)?; self.transaction_status_cf .put_protobuf((primary_index, signature, slot), &status)?; for address in writable_keys { @@ -2048,6 +2057,21 @@ impl Blockstore { Ok(()) } + fn ensure_lowest_cleanup_slot(&self) -> (std::sync::RwLockReadGuard, Slot) { + // Ensures consistent result by using lowest_cleanup_slot as the lower bound + // for reading columns that do not employ strong read consistency with slot-based + // delete_range + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + let lowest_available_slot = (*lowest_cleanup_slot) + .checked_add(1) + .expect("overflow from trusted value"); + + // Make caller hold this lock properly; otherwise LedgerCleanupService can purge/compact + // needed slots here at any given moment. + // Blockstore callers, like rpc, can process concurrent read queries + (lowest_cleanup_slot, lowest_available_slot) + } + // Returns a transaction status, as well as a loop counter for unit testing fn get_transaction_status_with_counter( &self, @@ -2055,9 +2079,15 @@ impl Blockstore { confirmed_unrooted_slots: &[Slot], ) -> Result<(Option<(Slot, TransactionStatusMeta)>, u64)> { let mut counter = 0; + let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot(); + for transaction_status_cf_primary_index in 0..=1 { let index_iterator = self.transaction_status_cf.iter(IteratorMode::From( - (transaction_status_cf_primary_index, signature, 0), + ( + transaction_status_cf_primary_index, + signature, + lowest_available_slot, + ), IteratorDirection::Forward, ))?; for ((i, sig, slot), _data) in index_iterator { @@ -2076,6 +2106,8 @@ impl Blockstore { return Ok((status, counter)); } } + drop(lock); + Ok((None, counter)) } @@ -2199,13 +2231,15 @@ impl Blockstore { start_slot: Slot, end_slot: Slot, ) -> Result> { + let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot(); + let mut signatures: Vec<(Slot, Signature)> = vec![]; for transaction_status_cf_primary_index in 0..=1 { let index_iterator = self.address_signatures_cf.iter(IteratorMode::From( ( transaction_status_cf_primary_index, pubkey, - start_slot, + start_slot.max(lowest_available_slot), Signature::default(), ), IteratorDirection::Forward, @@ -2220,6 +2254,7 @@ impl Blockstore { } } } + drop(lock); signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); Ok(signatures) } @@ -2232,13 +2267,14 @@ impl Blockstore { pubkey: Pubkey, slot: Slot, ) -> Result> { + let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot(); let mut signatures: Vec<(Slot, Signature)> = vec![]; for transaction_status_cf_primary_index in 0..=1 { let index_iterator = self.address_signatures_cf.iter(IteratorMode::From( ( transaction_status_cf_primary_index, pubkey, - slot, + slot.max(lowest_available_slot), Signature::default(), ), IteratorDirection::Forward, @@ -2253,6 +2289,7 @@ impl Blockstore { signatures.push((slot, signature)); } } + drop(lock); signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); Ok(signatures) } @@ -6676,6 +6713,176 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + fn do_test_lowest_cleanup_slot_and_special_cfs( + simulate_compaction: bool, + simulate_ledger_cleanup_service: bool, + ) { + solana_logger::setup(); + + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + // TransactionStatus column opens initialized with one entry at index 2 + let transaction_status_cf = blockstore.db.column::(); + + let pre_balances_vec = vec![1, 2, 3]; + let post_balances_vec = vec![3, 2, 1]; + let status = TransactionStatusMeta { + status: solana_sdk::transaction::Result::<()>::Ok(()), + fee: 42u64, + pre_balances: pre_balances_vec, + post_balances: post_balances_vec, + inner_instructions: Some(vec![]), + log_messages: Some(vec![]), + pre_token_balances: Some(vec![]), + post_token_balances: Some(vec![]), + rewards: Some(vec![]), + } + .into(); + + let signature1 = Signature::new(&[2u8; 64]); + let signature2 = Signature::new(&[3u8; 64]); + + // Insert rooted slots 0..=3 with no fork + let meta0 = SlotMeta::new(0, 0); + blockstore.meta_cf.put(0, &meta0).unwrap(); + let meta1 = SlotMeta::new(1, 0); + blockstore.meta_cf.put(1, &meta1).unwrap(); + let meta2 = SlotMeta::new(2, 1); + blockstore.meta_cf.put(2, &meta2).unwrap(); + let meta3 = SlotMeta::new(3, 2); + blockstore.meta_cf.put(3, &meta3).unwrap(); + + blockstore.set_roots(&[0, 1, 2, 3]).unwrap(); + + let lowest_cleanup_slot = 1; + let lowest_available_slot = lowest_cleanup_slot + 1; + + transaction_status_cf + .put_protobuf((0, signature1, lowest_cleanup_slot), &status) + .unwrap(); + + transaction_status_cf + .put_protobuf((0, signature2, lowest_available_slot), &status) + .unwrap(); + + let address0 = solana_sdk::pubkey::new_rand(); + let address1 = solana_sdk::pubkey::new_rand(); + blockstore + .write_transaction_status( + lowest_cleanup_slot, + signature1, + vec![&address0], + vec![], + TransactionStatusMeta::default(), + ) + .unwrap(); + blockstore + .write_transaction_status( + lowest_available_slot, + signature2, + vec![&address1], + vec![], + TransactionStatusMeta::default(), + ) + .unwrap(); + + let check_for_missing = || { + ( + blockstore + .get_transaction_status_with_counter(signature1, &[]) + .unwrap() + .0 + .is_none(), + blockstore + .find_address_signatures_for_slot(address0, lowest_cleanup_slot) + .unwrap() + .is_empty(), + blockstore + .find_address_signatures(address0, lowest_cleanup_slot, lowest_cleanup_slot) + .unwrap() + .is_empty(), + ) + }; + + let assert_existing_always = || { + let are_existing_always = ( + blockstore + .get_transaction_status_with_counter(signature2, &[]) + .unwrap() + .0 + .is_some(), + !blockstore + .find_address_signatures_for_slot(address1, lowest_available_slot) + .unwrap() + .is_empty(), + !blockstore + .find_address_signatures( + address1, + lowest_available_slot, + lowest_available_slot, + ) + .unwrap() + .is_empty(), + ); + assert_eq!(are_existing_always, (true, true, true)); + }; + + let are_missing = check_for_missing(); + // should never be missing before the conditional compaction & simulation... + assert_eq!(are_missing, (false, false, false)); + assert_existing_always(); + + if simulate_compaction { + blockstore.set_max_expired_slot(lowest_cleanup_slot); + // force compaction filters to run across whole key range. + blockstore + .compact_storage(Slot::min_value(), Slot::max_value()) + .unwrap(); + } + + if simulate_ledger_cleanup_service { + *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; + } + + let are_missing = check_for_missing(); + if simulate_compaction || simulate_ledger_cleanup_service { + // ... when either simulation (or both) is effective, we should observe to be missing + // consistently + assert_eq!(are_missing, (true, true, true)); + } else { + // ... otherwise, we should observe to be existing... + assert_eq!(are_missing, (false, false, false)); + } + assert_existing_always(); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_lowest_cleanup_slot_and_special_cfs_with_compact_with_ledger_cleanup_service_simulation( + ) { + do_test_lowest_cleanup_slot_and_special_cfs(true, true); + } + + #[test] + fn test_lowest_cleanup_slot_and_special_cfs_with_compact_without_ledger_cleanup_service_simulation( + ) { + do_test_lowest_cleanup_slot_and_special_cfs(true, false); + } + + #[test] + fn test_lowest_cleanup_slot_and_special_cfs_without_compact_with_ledger_cleanup_service_simulation( + ) { + do_test_lowest_cleanup_slot_and_special_cfs(false, true); + } + + #[test] + fn test_lowest_cleanup_slot_and_special_cfs_without_compact_without_ledger_cleanup_service_simulation( + ) { + do_test_lowest_cleanup_slot_and_special_cfs(false, false); + } + #[test] fn test_get_rooted_transaction() { let slot = 2; diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 4fe38a3f4f824a..fec06319b08f48 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -32,6 +32,19 @@ impl Blockstore { } } + /// Usually this is paired with .purge_slots() but we can't internally call this in + /// that function unconditionally. That's because set_max_expired_slot() + /// expects to purge older slots by the successive chronological order, while .purge_slots() + /// can also be used to purge *future* slots for --hard-fork thing, preserving older + /// slots. It'd be quite dangerous to purge older slots in that case. + /// So, current legal user of this function is LedgerCleanupService. + pub fn set_max_expired_slot(&self, to_slot: Slot) { + // convert here from inclusive purged range end to inclusive alive range start to align + // with Slot::default() for initial compaction filter behavior consistency + let to_slot = to_slot.checked_add(1).unwrap(); + self.db.set_oldest_slot(to_slot); + } + pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) { self.purge_slots(from_slot, to_slot, PurgeType::Exact); if let Err(e) = self.compact_storage(from_slot, to_slot) { @@ -180,6 +193,13 @@ impl Blockstore { to_slot, )?; } + PurgeType::CompactionFilter => { + // No explicit action is required here because this purge type completely and + // indefinitely relies on the proper working of compaction filter for those + // special column families, never toggling the primary index from the current + // one. Overall, this enables well uniformly distributed writes, resulting + // in no spiky periodic huge delete_range for them. + } } delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); @@ -193,6 +213,10 @@ impl Blockstore { write_timer.stop(); purge_stats.delete_range += delete_range_timer.as_us(); purge_stats.write_batch += write_timer.as_us(); + // only drop w_active_transaction_status_index after we do db.write(write_batch); + // otherwise, readers might be confused with inconsistent state between + // self.active_transaction_status_index and RockDb's TransactionStatusIndex contents + drop(w_active_transaction_status_index); Ok(columns_purged) } @@ -323,18 +347,26 @@ impl Blockstore { w_active_transaction_status_index: &mut u64, to_slot: Slot, ) -> Result<()> { - if let Some(index) = self.toggle_transaction_status_index( + if let Some(purged_index) = self.toggle_transaction_status_index( write_batch, w_active_transaction_status_index, to_slot, )? { *columns_purged &= self .db - .delete_range_cf::(write_batch, index, index + 1) + .delete_range_cf::( + write_batch, + purged_index, + purged_index + 1, + ) .is_ok() & self .db - .delete_range_cf::(write_batch, index, index + 1) + .delete_range_cf::( + write_batch, + purged_index, + purged_index + 1, + ) .is_ok(); } Ok(()) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index bfef4ed3201b75..7504005d9c90b6 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -5,9 +5,13 @@ use log::*; use prost::Message; pub use rocksdb::Direction as IteratorDirection; use rocksdb::{ - self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode, - IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, + self, + compaction_filter::CompactionFilter, + compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory}, + ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, DBIterator, DBRawIterator, + DBRecoveryMode, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB, }; + use serde::de::DeserializeOwned; use serde::Serialize; use solana_runtime::hardened_unpack::UnpackError; @@ -17,7 +21,17 @@ use solana_sdk::{ signature::Signature, }; use solana_storage_proto::convert::generated; -use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc}; +use std::{ + collections::HashMap, + ffi::{CStr, CString}, + fs, + marker::PhantomData, + path::Path, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use thiserror::Error; const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB @@ -58,6 +72,9 @@ const PERF_SAMPLES_CF: &str = "perf_samples"; /// Column family for BlockHeight const BLOCK_HEIGHT_CF: &str = "block_height"; +// 1 day is chosen for the same reasoning of DEFAULT_COMPACTION_SLOT_INTERVAL +const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24; + #[derive(Error, Debug)] pub enum BlockstoreError { ShredForIndexExists, @@ -208,8 +225,30 @@ impl From for DBRecoveryMode { } } +#[derive(Default, Clone, Debug)] +struct OldestSlot(Arc); + +impl OldestSlot { + pub fn set(&self, oldest_slot: Slot) { + // this is independently used for compaction_filter without any data dependency. + // also, compaction_filters are created via its factories, creating short-lived copies of + // this atomic value for the single job of compaction. So, Relaxed store can be justified + // in total + self.0.store(oldest_slot, Ordering::Relaxed); + } + + pub fn get(&self) -> Slot { + // copy from the AtomicU64 as a general precaution so that the oldest_slot can not mutate + // across single run of compaction for simpler reasoning although this isn't strict + // requirement at the moment + // also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't + // require strictly synchronized semantics in this regard + self.0.load(Ordering::Relaxed) + } +} + #[derive(Debug)] -struct Rocks(rocksdb::DB, ActualAccessType); +struct Rocks(rocksdb::DB, ActualAccessType, OldestSlot); impl Rocks { fn open( @@ -234,39 +273,73 @@ impl Rocks { db_options.set_wal_recovery_mode(recovery_mode.into()); } + let oldest_slot = OldestSlot::default(); + // Column family names - let meta_cf_descriptor = - ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type)); - let dead_slots_cf_descriptor = - ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type)); - let duplicate_slots_cf_descriptor = - ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type)); - let erasure_meta_cf_descriptor = - ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type)); - let orphans_cf_descriptor = - ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type)); - let root_cf_descriptor = - ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type)); - let index_cf_descriptor = - ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type)); - let shred_data_cf_descriptor = - ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type)); - let shred_code_cf_descriptor = - ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type)); - let transaction_status_cf_descriptor = - ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type)); - let address_signatures_cf_descriptor = - ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type)); - let transaction_status_index_cf_descriptor = - ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type)); - let rewards_cf_descriptor = - ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type)); - let blocktime_cf_descriptor = - ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type)); - let perf_samples_cf_descriptor = - ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type)); - let block_height_cf_descriptor = - ColumnFamilyDescriptor::new(BlockHeight::NAME, get_cf_options(&access_type)); + let meta_cf_descriptor = ColumnFamilyDescriptor::new( + SlotMeta::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new( + DeadSlots::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let duplicate_slots_cf_descriptor = ColumnFamilyDescriptor::new( + DuplicateSlots::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new( + ErasureMeta::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let orphans_cf_descriptor = ColumnFamilyDescriptor::new( + Orphans::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let root_cf_descriptor = ColumnFamilyDescriptor::new( + Root::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let index_cf_descriptor = ColumnFamilyDescriptor::new( + Index::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let shred_data_cf_descriptor = ColumnFamilyDescriptor::new( + ShredData::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let shred_code_cf_descriptor = ColumnFamilyDescriptor::new( + ShredCode::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let transaction_status_cf_descriptor = ColumnFamilyDescriptor::new( + TransactionStatus::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let address_signatures_cf_descriptor = ColumnFamilyDescriptor::new( + AddressSignatures::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let transaction_status_index_cf_descriptor = ColumnFamilyDescriptor::new( + TransactionStatusIndex::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let rewards_cf_descriptor = ColumnFamilyDescriptor::new( + Rewards::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let blocktime_cf_descriptor = ColumnFamilyDescriptor::new( + Blocktime::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let perf_samples_cf_descriptor = ColumnFamilyDescriptor::new( + PerfSamples::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); + let block_height_cf_descriptor = ColumnFamilyDescriptor::new( + BlockHeight::NAME, + get_cf_options::(&access_type, &oldest_slot), + ); let cfs = vec![ (SlotMeta::NAME, meta_cf_descriptor), @@ -289,18 +362,18 @@ impl Rocks { (PerfSamples::NAME, perf_samples_cf_descriptor), (BlockHeight::NAME, block_height_cf_descriptor), ]; + let cf_names: Vec<_> = cfs.iter().map(|c| c.0).collect(); // Open the database let db = match access_type { AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks( DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?, ActualAccessType::Primary, + oldest_slot, ), AccessType::TryPrimaryThenSecondary => { - let names: Vec<_> = cfs.iter().map(|c| c.0).collect(); - match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) { - Ok(db) => Rocks(db, ActualAccessType::Primary), + Ok(db) => Rocks(db, ActualAccessType::Primary, oldest_slot), Err(err) => { let secondary_path = path.join("solana-secondary"); @@ -312,13 +385,75 @@ impl Rocks { db_options.set_max_open_files(-1); Rocks( - DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?, + DB::open_cf_as_secondary( + &db_options, + path, + &secondary_path, + cf_names.clone(), + )?, ActualAccessType::Secondary, + oldest_slot, ) } } } }; + // this is only needed for LedgerCleanupService. so guard with PrimaryOnly (i.e. running solana-validator) + if matches!(access_type, AccessType::PrimaryOnly) { + for cf_name in cf_names { + // this special column family must be excluded from LedgerCleanupService's rocksdb + // compactions + if cf_name == TransactionStatusIndex::NAME { + continue; + } + + // This is the crux of our write-stall-free storage cleaning strategy with consistent + // state view for higher-layers + // + // For the consistent view, we commit delete_range on pruned slot range by LedgerCleanupService. + // simple story here. + // + // For actual storage cleaning, we employ RocksDB compaction. But default RocksDB compaction + // settings don't work well for us. That's because we're using it rather like a really big + // (100 GBs) ring-buffer. RocksDB is basically assuming uniform data write over the key space for + // efficient compaction, which isn't true for our use as a ring buffer. + // + // So, we customize the compaction strategy with 2 combined tweaks: + // (1) compaction_filter and (2) shortening its periodic cycles. + // + // Via the compaction_filter, we finally reclaim previously delete_range()-ed storage occupied + // by pruned slots. When compaction_filter is set, each SST files are re-compacted periodically + // to hunt for keys newly expired by the compaction_filter re-evaluation. But RocksDb's default + // `periodic_compaction_seconds` is 30 days, which is too long for our case. So, we + // shorten it to a day (24 hours). + // + // As we write newer SST files over time at rather consistent rate of speed, this + // effectively makes each newly-created ssts be re-compacted for the filter at + // well-dispersed different timings. + // As a whole, we rewrite the whole dataset at every PERIODIC_COMPACTION_SECONDS, + // slowly over the duration of PERIODIC_COMPACTION_SECONDS. So, this results in + // amortization. + // So, there is a bit inefficiency here because we'll rewrite not-so-old SST files + // too. But longer period would introduce higher variance of ledger storage sizes over + // the long period. And it's much better than the daily IO spike caused by compact_range() by + // previous implementation. + // + // `ttl` and `compact_range`(`ManualCompaction`), doesn't work nicely. That's + // because its original intention is delete_range()s to reclaim disk space. So it tries to merge + // them with N+1 SST files all way down to the bottommost SSTs, often leading to vastly large amount + // (= all) of invalidated SST files, when combined with newer writes happening at the opposite + // edge of the key space. This causes a long and heavy disk IOs and possible write + // stall and ultimately, the deadly Replay/Banking stage stall at higher layers. + db.0.set_options_cf( + db.cf_handle(cf_name), + &[( + "periodic_compaction_seconds", + &format!("{}", PERIODIC_COMPACTION_SECONDS), + )], + ) + .unwrap(); + } + } Ok(db) } @@ -415,9 +550,13 @@ pub trait Column { fn key(index: Self::Index) -> Vec; fn index(key: &[u8]) -> Self::Index; - fn primary_index(index: Self::Index) -> Slot; + // this return Slot or some u64 + fn primary_index(index: Self::Index) -> u64; #[allow(clippy::wrong_self_convention)] fn as_index(slot: Slot) -> Self::Index; + fn slot(index: Self::Index) -> Slot { + Self::primary_index(index) + } } pub trait ColumnName { @@ -491,6 +630,10 @@ impl Column for columns::TransactionStatus { index.0 } + fn slot(index: Self::Index) -> Slot { + index.2 + } + #[allow(clippy::wrong_self_convention)] fn as_index(index: u64) -> Self::Index { (index, Signature::default(), 0) @@ -528,6 +671,10 @@ impl Column for columns::AddressSignatures { index.0 } + fn slot(index: Self::Index) -> Slot { + index.2 + } + #[allow(clippy::wrong_self_convention)] fn as_index(index: u64) -> Self::Index { (index, Pubkey::default(), 0, Signature::default()) @@ -555,6 +702,10 @@ impl Column for columns::TransactionStatusIndex { index } + fn slot(_index: Self::Index) -> Slot { + unimplemented!() + } + #[allow(clippy::wrong_self_convention)] fn as_index(slot: u64) -> u64 { slot @@ -855,6 +1006,10 @@ impl Database { pub fn is_primary_access(&self) -> bool { self.backend.is_primary_access() } + + pub fn set_oldest_slot(&self, oldest_slot: Slot) { + self.backend.2.set(oldest_slot); + } } impl LedgerColumn @@ -1032,7 +1187,63 @@ impl<'a> WriteBatch<'a> { } } -fn get_cf_options(access_type: &AccessType) -> Options { +struct PurgedSlotFilter { + oldest_slot: Slot, + name: CString, + _phantom: PhantomData, +} + +impl CompactionFilter for PurgedSlotFilter { + fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> CompactionDecision { + use rocksdb::CompactionDecision::*; + + let slot_in_key = C::slot(C::index(key)); + // Refer to a comment about periodic_compaction_seconds, especially regarding implicit + // periodic execution of compaction_filters + if slot_in_key >= self.oldest_slot { + Keep + } else { + Remove + } + } + + fn name(&self) -> &CStr { + &self.name + } +} + +struct PurgedSlotFilterFactory { + oldest_slot: OldestSlot, + name: CString, + _phantom: PhantomData, +} + +impl CompactionFilterFactory for PurgedSlotFilterFactory { + type Filter = PurgedSlotFilter; + + fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter { + let copied_oldest_slot = self.oldest_slot.get(); + PurgedSlotFilter:: { + oldest_slot: copied_oldest_slot, + name: CString::new(format!( + "purged_slot_filter({}, {:?})", + C::NAME, + copied_oldest_slot + )) + .unwrap(), + _phantom: PhantomData::default(), + } + } + + fn name(&self) -> &CStr { + &self.name + } +} + +fn get_cf_options( + access_type: &AccessType, + oldest_slot: &OldestSlot, +) -> Options { let mut options = Options::default(); // 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM options.set_max_write_buffer_number(8); @@ -1046,6 +1257,19 @@ fn get_cf_options(access_type: &AccessType) -> Options { options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32); options.set_max_bytes_for_level_base(total_size_base); options.set_target_file_size_base(file_size_base); + + // TransactionStatusIndex must be excluded from LedgerCleanupService's rocksdb + // compactions.... + if matches!(access_type, AccessType::PrimaryOnly) + && C::NAME != columns::TransactionStatusIndex::NAME + { + options.set_compaction_filter_factory(PurgedSlotFilterFactory:: { + oldest_slot: oldest_slot.clone(), + name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(), + _phantom: PhantomData::default(), + }); + } + if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { options.set_disable_auto_compactions(true); } @@ -1077,3 +1301,57 @@ fn get_db_options(access_type: &AccessType) -> Options { options } + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::blockstore_db::columns::ShredData; + + #[test] + fn test_compaction_filter() { + // this doesn't implement Clone... + let dummy_compaction_filter_context = || CompactionFilterContext { + is_full_compaction: true, + is_manual_compaction: true, + }; + let oldest_slot = OldestSlot::default(); + + let mut factory = PurgedSlotFilterFactory:: { + oldest_slot: oldest_slot.clone(), + name: CString::new("test compaction filter").unwrap(), + _phantom: PhantomData::default(), + }; + let mut compaction_filter = factory.create(dummy_compaction_filter_context()); + + let dummy_level = 0; + let key = ShredData::key(ShredData::as_index(0)); + let dummy_value = vec![]; + + // we can't use assert_matches! because CompactionDecision doesn't implement Debug + assert!(matches!( + compaction_filter.filter(dummy_level, &key, &dummy_value), + CompactionDecision::Keep + )); + + // mutating oledst_slot doen't affect existing compaction filters... + oldest_slot.set(1); + assert!(matches!( + compaction_filter.filter(dummy_level, &key, &dummy_value), + CompactionDecision::Keep + )); + + // recreating compaction filter starts to expire the key + let mut compaction_filter = factory.create(dummy_compaction_filter_context()); + assert!(matches!( + compaction_filter.filter(dummy_level, &key, &dummy_value), + CompactionDecision::Remove + )); + + // newer key shouldn't be removed + let key = ShredData::key(ShredData::as_index(1)); + matches!( + compaction_filter.filter(dummy_level, &key, &dummy_value), + CompactionDecision::Keep + ); + } +} diff --git a/validator/src/main.rs b/validator/src/main.rs index 5cd3cbada2d27e..a1dc8c2c8ebcd2 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1500,7 +1500,7 @@ pub fn main() { Arg::with_name("no_rocksdb_compaction") .long("no-rocksdb-compaction") .takes_value(false) - .help("Disable manual compaction of the ledger database. May increase storage requirements.") + .help("Disable manual compaction of the ledger database (this is ignored).") ) .arg( Arg::with_name("rocksdb_compaction_interval") @@ -2016,7 +2016,7 @@ pub fn main() { let private_rpc = matches.is_present("private_rpc"); let no_port_check = matches.is_present("no_port_check"); - let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction"); + let no_rocksdb_compaction = true; let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok(); let rocksdb_max_compaction_jitter = value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok();