Skip to content

Commit

Permalink
Make ValidatorConfig contain a BlockstoreOptions (#3452)
Browse files Browse the repository at this point in the history
ValidatorConfig currently contains individual fields that are used to
buildup a BlockstoreOptions. To reduce clutter, this change just makes
ValidatorConfig contain a BlockstoreOptions directly
  • Loading branch information
steviez authored Nov 4, 2024
1 parent 8eb550f commit 990d711
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 92 deletions.
28 changes: 8 additions & 20 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use {
MAX_REPLAY_WAKE_UP_SIGNALS,
},
blockstore_metric_report_service::BlockstoreMetricReportService,
blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions},
blockstore_options::BlockstoreOptions,
blockstore_processor::{self, TransactionStatusSender},
entry_notifier_interface::EntryNotifierArc,
entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
Expand Down Expand Up @@ -230,9 +230,9 @@ pub struct ValidatorConfig {
pub pubsub_config: PubSubConfig,
pub snapshot_config: SnapshotConfig,
pub max_ledger_shreds: Option<u64>,
pub blockstore_options: BlockstoreOptions,
pub broadcast_stage_type: BroadcastStageType,
pub turbine_disabled: Arc<AtomicBool>,
pub enforce_ulimit_nofile: bool,
pub fixed_leader_schedule: Option<FixedSchedule>,
pub wait_for_supermajority: Option<Slot>,
pub new_hard_forks: Option<Vec<Slot>>,
Expand All @@ -242,7 +242,6 @@ pub struct ValidatorConfig {
pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
/// Run PoH, transaction signature and other transaction verifications during blockstore
/// processing.
pub run_verification: bool,
Expand Down Expand Up @@ -270,7 +269,6 @@ pub struct ValidatorConfig {
pub validator_exit: Arc<RwLock<Exit>>,
pub no_wait_for_vote_to_start_leader: bool,
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
pub block_verification_method: BlockVerificationMethod,
Expand Down Expand Up @@ -298,6 +296,7 @@ impl Default for ValidatorConfig {
expected_shred_version: None,
voting_disabled: false,
max_ledger_shreds: None,
blockstore_options: BlockstoreOptions::default(),
account_paths: Vec::new(),
account_snapshot_paths: Vec::new(),
rpc_config: JsonRpcConfig::default(),
Expand All @@ -307,7 +306,6 @@ impl Default for ValidatorConfig {
snapshot_config: SnapshotConfig::new_load_only(),
broadcast_stage_type: BroadcastStageType::Standard,
turbine_disabled: Arc::<AtomicBool>::default(),
enforce_ulimit_nofile: true,
fixed_leader_schedule: None,
wait_for_supermajority: None,
new_hard_forks: None,
Expand All @@ -317,7 +315,6 @@ impl Default for ValidatorConfig {
gossip_validators: None,
accounts_hash_interval_slots: u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None,
run_verification: true,
require_tower: false,
tower_storage: Arc::new(NullTowerStorage::default()),
Expand All @@ -343,7 +340,6 @@ impl Default for ValidatorConfig {
no_wait_for_vote_to_start_leader: true,
accounts_db_config: None,
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
banking_trace_dir_byte_limit: 0,
block_verification_method: BlockVerificationMethod::default(),
Expand All @@ -370,8 +366,8 @@ impl ValidatorConfig {
NonZeroUsize::new(get_max_thread_count()).expect("thread count is non-zero");

Self {
enforce_ulimit_nofile: false,
accounts_db_config: Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
blockstore_options: BlockstoreOptions::default_for_tests(),
rpc_config: JsonRpcConfig::default_for_test(),
block_production_method: BlockProductionMethod::default(),
enable_block_production_forwarding: true, // enable forwarding by default for tests
Expand Down Expand Up @@ -1857,15 +1853,6 @@ fn post_process_restored_tower(
Ok(restored_tower)
}

fn blockstore_options_from_config(config: &ValidatorConfig) -> BlockstoreOptions {
BlockstoreOptions {
recovery_mode: config.wal_recovery_mode.clone(),
column_options: config.ledger_column_options.clone(),
enforce_ulimit_nofile: config.enforce_ulimit_nofile,
..BlockstoreOptions::default()
}
}

fn load_genesis(
config: &ValidatorConfig,
ledger_path: &Path,
Expand Down Expand Up @@ -1926,7 +1913,7 @@ fn load_blockstore(
*start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;

let mut blockstore =
Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config))
Blockstore::open_with_options(ledger_path, config.blockstore_options.clone())
.map_err(|err| format!("Failed to open Blockstore: {err:?}"))?;

let (ledger_signal_sender, ledger_signal_receiver) = bounded(MAX_REPLAY_WAKE_UP_SIGNALS);
Expand Down Expand Up @@ -2353,7 +2340,8 @@ fn cleanup_blockstore_incorrect_shred_versions(
let backup_folder = format!(
"{}_backup_{}_{}_{}",
config
.ledger_column_options
.blockstore_options
.column_options
.shred_storage_type
.blockstore_directory(),
incorrect_shred_version,
Expand All @@ -2362,7 +2350,7 @@ fn cleanup_blockstore_incorrect_shred_versions(
);
match Blockstore::open_with_options(
&blockstore.ledger_path().join(backup_folder),
blockstore_options_from_config(config),
config.blockstore_options.clone(),
) {
Ok(backup_blockstore) => {
info!("Backing up slots from {start_slot} to {end_slot}");
Expand Down
12 changes: 12 additions & 0 deletions ledger/src/blockstore_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
std::path::Path,
};

#[derive(Debug, Clone)]
pub struct BlockstoreOptions {
// The access type of blockstore. Default: Primary
pub access_type: AccessType,
Expand All @@ -28,6 +29,17 @@ impl Default for BlockstoreOptions {
}
}

impl BlockstoreOptions {
pub fn default_for_tests() -> Self {
Self {
access_type: AccessType::Primary,
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: LedgerColumnOptions::default(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum AccessType {
/// Primary (read/write) access; only one process can have Primary access.
Expand Down
4 changes: 1 addition & 3 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
pubsub_config: config.pubsub_config.clone(),
snapshot_config: config.snapshot_config.clone(),
max_ledger_shreds: config.max_ledger_shreds,
blockstore_options: config.blockstore_options.clone(),
broadcast_stage_type: config.broadcast_stage_type.clone(),
turbine_disabled: config.turbine_disabled.clone(),
enforce_ulimit_nofile: config.enforce_ulimit_nofile,
fixed_leader_schedule: config.fixed_leader_schedule.clone(),
wait_for_supermajority: config.wait_for_supermajority,
new_hard_forks: config.new_hard_forks.clone(),
Expand All @@ -31,7 +31,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
gossip_validators: config.gossip_validators.clone(),
accounts_hash_interval_slots: config.accounts_hash_interval_slots,
max_genesis_archive_unpacked_size: config.max_genesis_archive_unpacked_size,
wal_recovery_mode: config.wal_recovery_mode.clone(),
run_verification: config.run_verification,
require_tower: config.require_tower,
tower_storage: config.tower_storage.clone(),
Expand All @@ -57,7 +56,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,
accounts_db_config: config.accounts_db_config.clone(),
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit,
block_verification_method: config.block_verification_method.clone(),
Expand Down
1 change: 0 additions & 1 deletion test-validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,6 @@ impl TestValidator {
incremental_snapshot_archives_dir: ledger_path.to_path_buf(),
..SnapshotConfig::default()
},
enforce_ulimit_nofile: false,
warp_slot: config.warp_slot,
validator_exit: config.validator_exit.clone(),
max_ledger_shreds: config.max_ledger_shreds,
Expand Down
150 changes: 82 additions & 68 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use {
solana_ledger::{
blockstore_cleanup_service::{DEFAULT_MAX_LEDGER_SHREDS, DEFAULT_MIN_MAX_LEDGER_SHREDS},
blockstore_options::{
BlockstoreCompressionType, BlockstoreRecoveryMode, LedgerColumnOptions,
ShredStorageType,
AccessType, BlockstoreCompressionType, BlockstoreOptions, BlockstoreRecoveryMode,
LedgerColumnOptions, ShredStorageType,
},
use_snapshot_archives_at_startup::{self, UseSnapshotArchivesAtStartup},
},
Expand Down Expand Up @@ -383,13 +383,13 @@ fn set_repair_whitelist(

/// Returns the default fifo shred storage size (include both data and coding
/// shreds) based on the validator config.
fn default_fifo_shred_storage_size(vc: &ValidatorConfig) -> Option<u64> {
fn default_fifo_shred_storage_size(max_ledger_shreds: Option<u64>) -> Option<u64> {
// The max shred size is around 1228 bytes.
// Here we reserve a little bit more than that to give extra storage for FIFO
// to prevent it from purging data that have not yet being marked as obsoleted
// by LedgerCleanupService.
const RESERVED_BYTES_PER_SHRED: u64 = 1500;
vc.max_ledger_shreds.map(|max_ledger_shreds| {
max_ledger_shreds.map(|max_ledger_shreds| {
// x2 as we have data shred and coding shred.
max_ledger_shreds * RESERVED_BYTES_PER_SHRED * 2
})
Expand Down Expand Up @@ -1006,9 +1006,6 @@ pub fn main() {
let tpu_coalesce = value_t!(matches, "tpu_coalesce_ms", u64)
.map(Duration::from_millis)
.unwrap_or(DEFAULT_TPU_COALESCE);
let wal_recovery_mode = matches
.value_of("wal_recovery_mode")
.map(BlockstoreRecoveryMode::from);

// Canonicalize ledger path to avoid issues with symlink creation
let ledger_path = create_and_canonicalize_directories([&ledger_path])
Expand All @@ -1022,6 +1019,82 @@ pub fn main() {
.pop()
.unwrap();

let recovery_mode = matches
.value_of("wal_recovery_mode")
.map(BlockstoreRecoveryMode::from);

let max_ledger_shreds = if matches.is_present("limit_ledger_size") {
let limit_ledger_size = match matches.value_of("limit_ledger_size") {
Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
None => DEFAULT_MAX_LEDGER_SHREDS,
};
if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
eprintln!(
"The provided --limit-ledger-size value was too small, the minimum value is \
{DEFAULT_MIN_MAX_LEDGER_SHREDS}"
);
exit(1);
}
Some(limit_ledger_size)
} else {
None
};

let column_options = LedgerColumnOptions {
compression_type: match matches.value_of("rocksdb_ledger_compression") {
None => BlockstoreCompressionType::default(),
Some(ledger_compression_string) => match ledger_compression_string {
"none" => BlockstoreCompressionType::None,
"snappy" => BlockstoreCompressionType::Snappy,
"lz4" => BlockstoreCompressionType::Lz4,
"zlib" => BlockstoreCompressionType::Zlib,
_ => panic!("Unsupported ledger_compression: {ledger_compression_string}"),
},
},
shred_storage_type: match matches.value_of("rocksdb_shred_compaction") {
None => ShredStorageType::default(),
Some(shred_compaction_string) => match shred_compaction_string {
"level" => ShredStorageType::RocksLevel,
"fifo" => {
warn!(
"The value \"fifo\" for --rocksdb-shred-compaction has been deprecated. \
Use of \"fifo\" will still work for now, but is planned for full removal \
in v2.1. To update, use \"level\" for --rocksdb-shred-compaction, or \
remove the --rocksdb-shred-compaction argument altogether. Note that the \
entire \"rocksdb_fifo\" subdirectory within the ledger directory will \
need to be manually removed once the validator is running with \"level\"."
);
match matches.value_of("rocksdb_fifo_shred_storage_size") {
None => ShredStorageType::rocks_fifo(default_fifo_shred_storage_size(
max_ledger_shreds,
)),
Some(_) => ShredStorageType::rocks_fifo(Some(value_t_or_exit!(
matches,
"rocksdb_fifo_shred_storage_size",
u64
))),
}
}
_ => panic!("Unrecognized rocksdb-shred-compaction: {shred_compaction_string}"),
},
},
rocks_perf_sample_interval: value_t_or_exit!(
matches,
"rocksdb_perf_sample_interval",
usize
),
};

let blockstore_options = BlockstoreOptions {
recovery_mode,
column_options,
// The validator needs to open many files, check that the process has
// permission to do so in order to fail quickly and give a direct error
enforce_ulimit_nofile: true,
// The validator needs primary (read/write)
access_type: AccessType::Primary,
};

let accounts_hash_cache_path = matches
.value_of("accounts_hash_cache_path")
.map(Into::into)
Expand Down Expand Up @@ -1495,7 +1568,8 @@ pub fn main() {
repair_validators,
repair_whitelist,
gossip_validators,
wal_recovery_mode,
max_ledger_shreds,
blockstore_options,
run_verification: !(matches.is_present("skip_poh_verify")
|| matches.is_present("skip_startup_ledger_verification")),
debug_keys,
Expand Down Expand Up @@ -1790,21 +1864,6 @@ pub fn main() {
exit(1);
}

if matches.is_present("limit_ledger_size") {
let limit_ledger_size = match matches.value_of("limit_ledger_size") {
Some(_) => value_t_or_exit!(matches, "limit_ledger_size", u64),
None => DEFAULT_MAX_LEDGER_SHREDS,
};
if limit_ledger_size < DEFAULT_MIN_MAX_LEDGER_SHREDS {
eprintln!(
"The provided --limit-ledger-size value was too small, the minimum value is \
{DEFAULT_MIN_MAX_LEDGER_SHREDS}"
);
exit(1);
}
validator_config.max_ledger_shreds = Some(limit_ledger_size);
}

configure_banking_trace_dir_byte_limit(&mut validator_config, &matches);
validator_config.block_verification_method = value_t!(
matches,
Expand All @@ -1822,51 +1881,6 @@ pub fn main() {
validator_config.unified_scheduler_handler_threads =
value_t!(matches, "unified_scheduler_handler_threads", usize).ok();

validator_config.ledger_column_options = LedgerColumnOptions {
compression_type: match matches.value_of("rocksdb_ledger_compression") {
None => BlockstoreCompressionType::default(),
Some(ledger_compression_string) => match ledger_compression_string {
"none" => BlockstoreCompressionType::None,
"snappy" => BlockstoreCompressionType::Snappy,
"lz4" => BlockstoreCompressionType::Lz4,
"zlib" => BlockstoreCompressionType::Zlib,
_ => panic!("Unsupported ledger_compression: {ledger_compression_string}"),
},
},
shred_storage_type: match matches.value_of("rocksdb_shred_compaction") {
None => ShredStorageType::default(),
Some(shred_compaction_string) => match shred_compaction_string {
"level" => ShredStorageType::RocksLevel,
"fifo" => {
warn!(
"The value \"fifo\" for --rocksdb-shred-compaction has been deprecated. \
Use of \"fifo\" will still work for now, but is planned for full removal \
in v2.1. To update, use \"level\" for --rocksdb-shred-compaction, or \
remove the --rocksdb-shred-compaction argument altogether. Note that the \
entire \"rocksdb_fifo\" subdirectory within the ledger directory will \
need to be manually removed once the validator is running with \"level\"."
);
match matches.value_of("rocksdb_fifo_shred_storage_size") {
None => ShredStorageType::rocks_fifo(default_fifo_shred_storage_size(
&validator_config,
)),
Some(_) => ShredStorageType::rocks_fifo(Some(value_t_or_exit!(
matches,
"rocksdb_fifo_shred_storage_size",
u64
))),
}
}
_ => panic!("Unrecognized rocksdb-shred-compaction: {shred_compaction_string}"),
},
},
rocks_perf_sample_interval: value_t_or_exit!(
matches,
"rocksdb_perf_sample_interval",
usize
),
};

let public_rpc_addr = matches.value_of("public_rpc_addr").map(|addr| {
solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
eprintln!("failed to parse public rpc address: {e}");
Expand Down

0 comments on commit 990d711

Please sign in to comment.