diff --git a/core/src/validator.rs b/core/src/validator.rs index a631cf36a8fe0d..d6a1c672a299c4 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -65,7 +65,7 @@ use { MAX_REPLAY_WAKE_UP_SIGNALS, }, blockstore_metric_report_service::BlockstoreMetricReportService, - blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions}, + blockstore_options::BlockstoreOptions, blockstore_processor::{self, TransactionStatusSender}, entry_notifier_interface::EntryNotifierArc, entry_notifier_service::{EntryNotifierSender, EntryNotifierService}, @@ -230,9 +230,9 @@ pub struct ValidatorConfig { pub pubsub_config: PubSubConfig, pub snapshot_config: SnapshotConfig, pub max_ledger_shreds: Option, + pub blockstore_options: BlockstoreOptions, pub broadcast_stage_type: BroadcastStageType, pub turbine_disabled: Arc, - pub enforce_ulimit_nofile: bool, pub fixed_leader_schedule: Option, pub wait_for_supermajority: Option, pub new_hard_forks: Option>, @@ -242,7 +242,6 @@ pub struct ValidatorConfig { pub gossip_validators: Option>, // None = gossip with all pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, - pub wal_recovery_mode: Option, /// Run PoH, transaction signature and other transaction verifications during blockstore /// processing. pub run_verification: bool, @@ -270,7 +269,6 @@ pub struct ValidatorConfig { pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, pub wait_to_vote_slot: Option, - pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, @@ -298,6 +296,7 @@ impl Default for ValidatorConfig { expected_shred_version: None, voting_disabled: false, max_ledger_shreds: None, + blockstore_options: BlockstoreOptions::default(), account_paths: Vec::new(), account_snapshot_paths: Vec::new(), rpc_config: JsonRpcConfig::default(), @@ -307,7 +306,6 @@ impl Default for ValidatorConfig { snapshot_config: SnapshotConfig::new_load_only(), broadcast_stage_type: BroadcastStageType::Standard, turbine_disabled: Arc::::default(), - enforce_ulimit_nofile: true, fixed_leader_schedule: None, wait_for_supermajority: None, new_hard_forks: None, @@ -317,7 +315,6 @@ impl Default for ValidatorConfig { gossip_validators: None, accounts_hash_interval_slots: u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, - wal_recovery_mode: None, run_verification: true, require_tower: false, tower_storage: Arc::new(NullTowerStorage::default()), @@ -343,7 +340,6 @@ impl Default for ValidatorConfig { no_wait_for_vote_to_start_leader: true, accounts_db_config: None, wait_to_vote_slot: None, - ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), @@ -370,8 +366,8 @@ impl ValidatorConfig { NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero"); Self { - enforce_ulimit_nofile: false, accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + blockstore_options: BlockstoreOptions::default_for_tests(), rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::default(), enable_block_production_forwarding: true, // enable forwarding by default for tests @@ -1857,15 +1853,6 @@ fn post_process_restored_tower( Ok(restored_tower) } -fn blockstore_options_from_config(config: &ValidatorConfig) -> BlockstoreOptions { - BlockstoreOptions { - recovery_mode: config.wal_recovery_mode.clone(), - column_options: config.ledger_column_options.clone(), - enforce_ulimit_nofile: config.enforce_ulimit_nofile, - ..BlockstoreOptions::default() - } -} - fn load_genesis( config: &ValidatorConfig, ledger_path: &Path, @@ -1926,7 +1913,7 @@ fn load_blockstore( *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger; let mut blockstore = - Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config)) + Blockstore::open_with_options(ledger_path, config.blockstore_options.clone()) .map_err(|err| format!("Failed to open Blockstore: {err:?}"))?; let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS); @@ -2353,7 +2340,8 @@ fn cleanup_blockstore_incorrect_shred_versions( let backup_folder = format!( "{}_backup_{}_{}_{}", config - .ledger_column_options + .blockstore_options + .column_options .shred_storage_type .blockstore_directory(), incorrect_shred_version, @@ -2362,7 +2350,7 @@ fn cleanup_blockstore_incorrect_shred_versions( ); match Blockstore::open_with_options( &blockstore.ledger_path().join(backup_folder), - blockstore_options_from_config(config), + config.blockstore_options.clone(), ) { Ok(backup_blockstore) => { info!("Backing up slots from {start_slot} to {end_slot}"); diff --git a/ledger/src/blockstore_options.rs b/ledger/src/blockstore_options.rs index 04432dd2ac92d9..fb8718a66a5e86 100644 --- a/ledger/src/blockstore_options.rs +++ b/ledger/src/blockstore_options.rs @@ -3,6 +3,7 @@ use { std::path::Path, }; +#[derive(Debug, Clone)] pub struct BlockstoreOptions { // The access type of blockstore. Default: Primary pub access_type: AccessType, @@ -28,6 +29,17 @@ impl Default for BlockstoreOptions { } } +impl BlockstoreOptions { + pub fn default_for_tests() -> Self { + Self { + access_type: AccessType::Primary, + recovery_mode: None, + enforce_ulimit_nofile: false, + column_options: LedgerColumnOptions::default(), + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub enum AccessType { /// Primary (read/write) access; only one process can have Primary access. diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 6031b637d01e50..78a7acd681ef75 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -19,9 +19,9 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), max_ledger_shreds: config.max_ledger_shreds, + blockstore_options: config.blockstore_options.clone(), broadcast_stage_type: config.broadcast_stage_type.clone(), turbine_disabled: config.turbine_disabled.clone(), - enforce_ulimit_nofile: config.enforce_ulimit_nofile, fixed_leader_schedule: config.fixed_leader_schedule.clone(), wait_for_supermajority: config.wait_for_supermajority, new_hard_forks: config.new_hard_forks.clone(), @@ -31,7 +31,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { gossip_validators: config.gossip_validators.clone(), accounts_hash_interval_slots: config.accounts_hash_interval_slots, max_genesis_archive_unpacked_size: config.max_genesis_archive_unpacked_size, - wal_recovery_mode: config.wal_recovery_mode.clone(), run_verification: config.run_verification, require_tower: config.require_tower, tower_storage: config.tower_storage.clone(), @@ -57,7 +56,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, accounts_db_config: config.accounts_db_config.clone(), wait_to_vote_slot: config.wait_to_vote_slot, - ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 8f25086d2f6ef1..bc31cced56ffbf 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -1019,7 +1019,6 @@ impl TestValidator { incremental_snapshot_archives_dir: ledger_path.to_path_buf(), ..SnapshotConfig::default() }, - enforce_ulimit_nofile: false, warp_slot: config.warp_slot, validator_exit: config.validator_exit.clone(), max_ledger_shreds: config.max_ledger_shreds, diff --git a/validator/src/main.rs b/validator/src/main.rs index 0a2ec83d921590..5c0105ba6c76b0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -44,8 +44,8 @@ use { solana_ledger::{ blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS}, blockstore_options::{ - BlockstoreCompressionType, BlockstoreRecoveryMode, LedgerColumnOptions, - ShredStorageType, + AccessType, BlockstoreCompressionType, BlockstoreOptions, BlockstoreRecoveryMode, + LedgerColumnOptions, ShredStorageType, }, use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup}, }, @@ -383,13 +383,13 @@ fn set_repair_whitelist( /// Returns the default fifo shred storage size (include both data and coding /// shreds) based on the validator config. -fn default_fifo_shred_storage_size(vc: &ValidatorConfig) -> Option { +fn default_fifo_shred_storage_size(max_ledger_shreds: Option) -> Option { // The max shred size is around 1228 bytes. // Here we reserve a little bit more than that to give extra storage for FIFO // to prevent it from purging data that have not yet being marked as obsoleted // by LedgerCleanupService. const RESERVED_BYTES_PER_SHRED: u64 = 1500; - vc.max_ledger_shreds.map(|max_ledger_shreds| { + max_ledger_shreds.map(|max_ledger_shreds| { // x2 as we have data shred and coding shred. max_ledger_shreds * RESERVED_BYTES_PER_SHRED * 2 }) @@ -1006,9 +1006,6 @@ pub fn main() { let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64) .map(Duration::from_millis) .unwrap_or(DEFAULT_TPU_COALESCE); - let wal_recovery_mode = matches - .value_of("wal_recovery_mode") - .map(BlockstoreRecoveryMode::from); // Canonicalize ledger path to avoid issues with symlink creation let ledger_path = create_and_canonicalize_directories([&ledger_path]) @@ -1022,6 +1019,82 @@ pub fn main() { .pop() .unwrap(); + let recovery_mode = matches + .value_of("wal_recovery_mode") + .map(BlockstoreRecoveryMode::from); + + let max_ledger_shreds = if matches.is_present("limit_ledger_size") { + let limit_ledger_size = match matches.value_of("limit_ledger_size") { + Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64), + None => DEFAULT_MAX_LEDGER_SHREDS, + }; + if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS { + eprintln!( + "The provided --limit-ledger-size value was too small, the minimum value is \ + {DEFAULT_MIN_MAX_LEDGER_SHREDS}" + ); + exit(1); + } + Some(limit_ledger_size) + } else { + None + }; + + let column_options = LedgerColumnOptions { + compression_type: match matches.value_of("rocksdb_ledger_compression") { + None => BlockstoreCompressionType::default(), + Some(ledger_compression_string) => match ledger_compression_string { + "none" => BlockstoreCompressionType::None, + "snappy" => BlockstoreCompressionType::Snappy, + "lz4" => BlockstoreCompressionType::Lz4, + "zlib" => BlockstoreCompressionType::Zlib, + _ => panic!("Unsupported ledger_compression: {ledger_compression_string}"), + }, + }, + shred_storage_type: match matches.value_of("rocksdb_shred_compaction") { + None => ShredStorageType::default(), + Some(shred_compaction_string) => match shred_compaction_string { + "level" => ShredStorageType::RocksLevel, + "fifo" => { + warn!( + "The value \"fifo\" for --rocksdb-shred-compaction has been deprecated. \ + Use of \"fifo\" will still work for now, but is planned for full removal \ + in v2.1. To update, use \"level\" for --rocksdb-shred-compaction, or \ + remove the --rocksdb-shred-compaction argument altogether. Note that the \ + entire \"rocksdb_fifo\" subdirectory within the ledger directory will \ + need to be manually removed once the validator is running with \"level\"." + ); + match matches.value_of("rocksdb_fifo_shred_storage_size") { + None => ShredStorageType::rocks_fifo(default_fifo_shred_storage_size( + max_ledger_shreds, + )), + Some(_) => ShredStorageType::rocks_fifo(Some(value_t_or_exit!( + matches, + "rocksdb_fifo_shred_storage_size", + u64 + ))), + } + } + _ => panic!("Unrecognized rocksdb-shred-compaction: {shred_compaction_string}"), + }, + }, + rocks_perf_sample_interval: value_t_or_exit!( + matches, + "rocksdb_perf_sample_interval", + usize + ), + }; + + let blockstore_options = BlockstoreOptions { + recovery_mode, + column_options, + // The validator needs to open many files, check that the process has + // permission to do so in order to fail quickly and give a direct error + enforce_ulimit_nofile: true, + // The validator needs primary (read/write) + access_type: AccessType::Primary, + }; + let accounts_hash_cache_path = matches .value_of("accounts_hash_cache_path") .map(Into::into) @@ -1495,7 +1568,8 @@ pub fn main() { repair_validators, repair_whitelist, gossip_validators, - wal_recovery_mode, + max_ledger_shreds, + blockstore_options, run_verification: !(matches.is_present("skip_poh_verify") || matches.is_present("skip_startup_ledger_verification")), debug_keys, @@ -1790,21 +1864,6 @@ pub fn main() { exit(1); } - if matches.is_present("limit_ledger_size") { - let limit_ledger_size = match matches.value_of("limit_ledger_size") { - Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64), - None => DEFAULT_MAX_LEDGER_SHREDS, - }; - if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS { - eprintln!( - "The provided --limit-ledger-size value was too small, the minimum value is \ - {DEFAULT_MIN_MAX_LEDGER_SHREDS}" - ); - exit(1); - } - validator_config.max_ledger_shreds = Some(limit_ledger_size); - } - configure_banking_trace_dir_byte_limit(&mut validator_config, &matches); validator_config.block_verification_method = value_t!( matches, @@ -1822,51 +1881,6 @@ pub fn main() { validator_config.unified_scheduler_handler_threads = value_t!(matches, "unified_scheduler_handler_threads", usize).ok(); - validator_config.ledger_column_options = LedgerColumnOptions { - compression_type: match matches.value_of("rocksdb_ledger_compression") { - None => BlockstoreCompressionType::default(), - Some(ledger_compression_string) => match ledger_compression_string { - "none" => BlockstoreCompressionType::None, - "snappy" => BlockstoreCompressionType::Snappy, - "lz4" => BlockstoreCompressionType::Lz4, - "zlib" => BlockstoreCompressionType::Zlib, - _ => panic!("Unsupported ledger_compression: {ledger_compression_string}"), - }, - }, - shred_storage_type: match matches.value_of("rocksdb_shred_compaction") { - None => ShredStorageType::default(), - Some(shred_compaction_string) => match shred_compaction_string { - "level" => ShredStorageType::RocksLevel, - "fifo" => { - warn!( - "The value \"fifo\" for --rocksdb-shred-compaction has been deprecated. \ - Use of \"fifo\" will still work for now, but is planned for full removal \ - in v2.1. To update, use \"level\" for --rocksdb-shred-compaction, or \ - remove the --rocksdb-shred-compaction argument altogether. Note that the \ - entire \"rocksdb_fifo\" subdirectory within the ledger directory will \ - need to be manually removed once the validator is running with \"level\"." - ); - match matches.value_of("rocksdb_fifo_shred_storage_size") { - None => ShredStorageType::rocks_fifo(default_fifo_shred_storage_size( - &validator_config, - )), - Some(_) => ShredStorageType::rocks_fifo(Some(value_t_or_exit!( - matches, - "rocksdb_fifo_shred_storage_size", - u64 - ))), - } - } - _ => panic!("Unrecognized rocksdb-shred-compaction: {shred_compaction_string}"), - }, - }, - rocks_perf_sample_interval: value_t_or_exit!( - matches, - "rocksdb_perf_sample_interval", - usize - ), - }; - let public_rpc_addr = matches.value_of("public_rpc_addr").map(|addr| { solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| { eprintln!("failed to parse public rpc address: {e}");