Skip to content

Commit b508b9d

Browse files
committed
Automatic partition store memory manager
This removes the configuration option `worker.storage.num-partitions-to-share-memory-budget` and instead replaces with an automatic memory manager that checks and rebalances memory across partitions every 5 seconds. - Removed rocksdb stall detection and hard-coded allow_stall to false in WBM since WBM almost always does the wrong thing and stalls forever under memory pressure. Removal of stall detection saves a timer on every write batch. - Allowed more flexible configuration of rocksdb per database. - Partition store manager now automatically rebalances its memory budget across open partition stores and runs a garbage collection pass on partitions that have been closed for some time to reclaim memory. - Switched lz4 to zstd to make for consistency across all databases. - Partition store manager reacts to live changes in memory budget - Log server reacts to live memory budget changes by reconfiguring future write buffers to meet the new budget
1 parent a9c0e08 commit b508b9d

File tree

47 files changed

+1670
-1057
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1670
-1057
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use restate_types::config::{
2525
MetadataServerOptionsBuilder, WorkerOptionsBuilder,
2626
};
2727
use restate_types::config_loader::ConfigLoaderBuilder;
28-
use restate_types::live::Constant;
2928
use restate_types::logs::metadata::ProviderKind;
3029
use restate_types::retries::RetryPolicy;
3130
use std::time::Duration;
@@ -108,7 +107,7 @@ pub fn spawn_restate(config: Configuration) -> task_center::Handle {
108107
let live_config = Configuration::live();
109108

110109
tc.block_on(async {
111-
RocksDbManager::init(Constant::new(config.common));
110+
RocksDbManager::init();
112111
prometheus.start_upkeep_task();
113112

114113
TaskCenter::spawn(TaskKind::SystemBoot, "restate", async move {

crates/bifrost/src/bifrost.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,6 @@ mod tests {
776776
use restate_core::TestCoreEnvBuilder;
777777
use restate_core::{TaskCenter, TaskKind, TestCoreEnv};
778778
use restate_rocksdb::RocksDbManager;
779-
use restate_types::config::CommonOptions;
780-
use restate_types::live::Constant;
781779
use restate_types::logs::SequenceNumber;
782780
use restate_types::logs::metadata::{SegmentIndex, new_single_node_loglet_params};
783781
use restate_types::metadata::Precondition;
@@ -890,7 +888,7 @@ mod tests {
890888
.set_provider_kind(ProviderKind::Local)
891889
.build()
892890
.await;
893-
RocksDbManager::init(Constant::new(CommonOptions::default()));
891+
RocksDbManager::init();
894892

895893
let bifrost = Bifrost::init_local(node_env.metadata_writer).await;
896894

@@ -1389,7 +1387,7 @@ mod tests {
13891387
.set_provider_kind(ProviderKind::Local)
13901388
.build()
13911389
.await;
1392-
RocksDbManager::init(Constant::new(CommonOptions::default()));
1390+
RocksDbManager::init();
13931391
let bifrost = Bifrost::init_local(node_env.metadata_writer).await;
13941392

13951393
// create an appender

crates/bifrost/src/providers/local_loglet/log_store.rs

Lines changed: 134 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010

1111
use std::sync::Arc;
1212

13-
use restate_types::errors::MaybeRetryableError;
14-
use rocksdb::{BoundColumnFamily, DB, DBCompressionType, SliceTransform};
13+
use rocksdb::{BlockBasedOptions, BoundColumnFamily, Cache, DB, DBCompressionType, SliceTransform};
1514
use static_assertions::const_assert;
1615

1716
use restate_rocksdb::{
1817
CfExactPattern, CfName, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
1918
};
20-
use restate_types::config::LocalLogletOptions;
21-
use restate_types::live::{LiveLoad, LiveLoadExt};
19+
use restate_types::config::{Configuration, LocalLogletOptions};
20+
use restate_types::errors::MaybeRetryableError;
21+
use restate_types::live::LiveLoad;
2222
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
2323

2424
use super::keys::{DATA_KEY_PREFIX_LENGTH, MetadataKey, MetadataKind};
@@ -74,24 +74,16 @@ impl RocksDbLogStore {
7474
let opts = options.live_load();
7575
let data_dir = opts.data_dir();
7676

77-
let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), data_dir, db_options())
78-
.add_cf_pattern(
79-
CfExactPattern::new(DATA_CF),
80-
cf_data_options(opts.rocksdb_memory_budget()),
81-
)
82-
.add_cf_pattern(
83-
CfExactPattern::new(METADATA_CF),
84-
cf_metadata_options(opts.rocksdb_memory_budget()),
85-
)
77+
let db_spec = DbSpecBuilder::new(DbName::new(DB_NAME), data_dir, RocksConfigurator)
78+
.add_cf_pattern(CfExactPattern::new(DATA_CF), RocksConfigurator)
79+
.add_cf_pattern(CfExactPattern::new(METADATA_CF), RocksConfigurator)
8680
// not very important but it's to reduce the number of merges by flushing.
8781
// it's also a small cf so it should be quick.
8882
.add_to_flush_on_shutdown(CfExactPattern::new(METADATA_CF))
8983
.ensure_column_families(cfs)
9084
.build()
9185
.expect("valid spec");
92-
let rocksdb = db_manager
93-
.open_db(options.map(|options| &options.rocksdb).boxed(), db_spec)
94-
.await?;
86+
let rocksdb = db_manager.open_db(db_spec).await?;
9587
Ok(Self { rocksdb })
9688
}
9789

@@ -132,62 +124,107 @@ impl RocksDbLogStore {
132124
}
133125
}
134126

135-
fn db_options() -> rocksdb::Options {
136-
let mut opts = rocksdb::Options::default();
137-
138-
// Enable atomic flushes.
139-
// If WAL is disabled, this ensure we do not persist inconsistent data.
140-
// If WAL is enabled, this ensures that flushing either cf flushes both.
141-
// This is valuable because otherwise the metadata cf will flush rarely, and that would keep the WAL around
142-
// until shutdown, full of data cf bytes that have already been flushed, wasting disk space.
143-
opts.set_atomic_flush(true);
144-
145-
// This is Rocksdb's default, it's added here for clarity.
146-
//
147-
// Rationale: If WAL tail is corrupted, it's likely that it has failed during write, that said,
148-
// we can use absolute consistency but on a single-node setup, we don't have a way to recover
149-
// from it, so it's not useful for us.
150-
opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
151-
opts.set_wal_compression_type(DBCompressionType::Zstd);
152-
// most reads are sequential
153-
opts.set_advise_random_on_open(false);
154-
155-
opts
127+
struct RocksConfigurator;
128+
129+
impl restate_rocksdb::configuration::DbConfigurator for RocksConfigurator {
130+
fn get_db_options(
131+
&self,
132+
_db_name: &str,
133+
env: &rocksdb::Env,
134+
write_buffer_manager: &rocksdb::WriteBufferManager,
135+
) -> rocksdb::Options {
136+
let mut db_options = restate_rocksdb::configuration::create_default_db_options(
137+
env,
138+
true, /* create_db_if_missing */
139+
write_buffer_manager,
140+
);
141+
let local_loglet_config = &Configuration::pinned().bifrost.local;
142+
// amend default options from rocksdb_manager
143+
self.apply_db_opts_from_config(&mut db_options, &local_loglet_config.rocksdb);
144+
// local loglet customizations
145+
146+
// Enable atomic flushes.
147+
// If WAL is disabled, this ensure we do not persist inconsistent data.
148+
// If WAL is enabled, this ensures that flushing either cf flushes both.
149+
// This is valuable because otherwise the metadata cf will flush rarely, and that would keep the WAL around
150+
// until shutdown, full of data cf bytes that have already been flushed, wasting disk space.
151+
db_options.set_atomic_flush(true);
152+
153+
// This is Rocksdb's default, it's added here for clarity.
154+
//
155+
// Rationale: If WAL tail is corrupted, it's likely that it has failed during write, that said,
156+
// we can use absolute consistency but on a single-node setup, we don't have a way to recover
157+
// from it, so it's not useful for us.
158+
db_options.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
159+
db_options.set_wal_compression_type(DBCompressionType::Zstd);
160+
// most reads are sequential
161+
db_options.set_advise_random_on_open(false);
162+
163+
db_options
164+
}
156165
}
157166

158-
fn cf_data_options(
159-
memory_budget: usize,
160-
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
161-
move |mut opts| {
162-
// memory budget is in bytes. We divide the budget between the data cf and metadata cf.
163-
// data 10% to metadata 90% to data.
164-
let memtables_budget = (memory_budget as f64 * DATA_CF_BUDGET_RATIO).floor() as usize;
165-
assert!(
166-
memtables_budget > 0,
167-
"memory budget should be greater than 0"
167+
impl restate_rocksdb::configuration::CfConfigurator for RocksConfigurator {
168+
fn get_cf_options(
169+
&self,
170+
_db_name: &str,
171+
cf_name: &str,
172+
global_cache: &Cache,
173+
write_buffer_manager: &rocksdb::WriteBufferManager,
174+
) -> rocksdb::Options {
175+
let config = &Configuration::pinned().bifrost.local;
176+
let mut cf_options =
177+
restate_rocksdb::configuration::create_default_cf_options(Some(write_buffer_manager));
178+
let block_options = restate_rocksdb::configuration::create_default_block_options(
179+
&config.rocksdb,
180+
Some(global_cache),
168181
);
169182

170-
set_memory_related_opts(&mut opts, memtables_budget);
171-
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
172-
opts.set_num_levels(7);
173-
174-
opts.set_compression_per_level(&[
175-
DBCompressionType::None,
176-
DBCompressionType::None,
177-
DBCompressionType::Lz4,
178-
DBCompressionType::Lz4,
179-
DBCompressionType::Lz4,
180-
DBCompressionType::Lz4,
181-
DBCompressionType::Zstd,
182-
]);
183-
184-
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(DATA_KEY_PREFIX_LENGTH));
185-
opts.set_memtable_prefix_bloom_ratio(0.2);
186-
//
187-
opts
183+
if cf_name == DATA_CF {
184+
cf_data_options(&mut cf_options, &block_options, config);
185+
} else if cf_name == METADATA_CF {
186+
cf_metadata_options(&mut cf_options, &block_options, config);
187+
}
188+
189+
cf_options
188190
}
189191
}
190192

193+
fn cf_data_options(
194+
opts: &mut rocksdb::Options,
195+
block_options: &BlockBasedOptions,
196+
local_loglet_config: &LocalLogletOptions,
197+
) {
198+
opts.set_block_based_table_factory(block_options);
199+
200+
let memory_budget = local_loglet_config.rocksdb_memory_budget();
201+
202+
// memory budget is in bytes. We divide the budget between the data cf and metadata cf.
203+
// data 10% to metadata 90% to data.
204+
let memtables_budget = (memory_budget as f64 * DATA_CF_BUDGET_RATIO).floor() as usize;
205+
assert!(
206+
memtables_budget > 0,
207+
"memory budget should be greater than 0"
208+
);
209+
210+
set_memory_related_opts(opts, memtables_budget);
211+
opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
212+
opts.set_num_levels(7);
213+
214+
opts.set_compression_per_level(&[
215+
DBCompressionType::None,
216+
DBCompressionType::None,
217+
DBCompressionType::Zstd,
218+
DBCompressionType::Zstd,
219+
DBCompressionType::Zstd,
220+
DBCompressionType::Zstd,
221+
DBCompressionType::Zstd,
222+
]);
223+
224+
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(DATA_KEY_PREFIX_LENGTH));
225+
opts.set_memtable_prefix_bloom_ratio(0.2);
226+
}
227+
191228
fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize) {
192229
// We set the budget to allow 1 mutable + 3 immutable.
193230
opts.set_write_buffer_size(memtables_budget / 4);
@@ -206,34 +243,36 @@ fn set_memory_related_opts(opts: &mut rocksdb::Options, memtables_budget: usize)
206243
}
207244

208245
fn cf_metadata_options(
209-
memory_budget: usize,
210-
) -> impl Fn(rocksdb::Options) -> rocksdb::Options + Send + Sync + 'static {
211-
move |mut opts| {
212-
let memtables_budget =
213-
(memory_budget as f64 * (1.0 - DATA_CF_BUDGET_RATIO)).floor() as usize;
214-
assert!(
215-
memtables_budget > 0,
216-
"memory budget should be greater than 0"
217-
);
218-
set_memory_related_opts(&mut opts, memtables_budget);
219-
//
220-
// Set compactions per level
221-
//
222-
opts.set_num_levels(3);
223-
opts.set_compression_per_level(&[
224-
DBCompressionType::None,
225-
DBCompressionType::None,
226-
DBCompressionType::Lz4,
227-
]);
228-
opts.set_memtable_whole_key_filtering(true);
229-
opts.set_max_write_buffer_number(4);
230-
opts.set_max_successive_merges(10);
231-
// Merge operator for log state updates
232-
opts.set_merge_operator(
233-
"LogStateMerge",
234-
log_state_full_merge,
235-
log_state_partial_merge,
236-
);
237-
opts
238-
}
246+
opts: &mut rocksdb::Options,
247+
block_options: &BlockBasedOptions,
248+
local_loglet_config: &LocalLogletOptions,
249+
) {
250+
opts.set_block_based_table_factory(block_options);
251+
252+
let memory_budget = local_loglet_config.rocksdb_memory_budget();
253+
254+
let memtables_budget = (memory_budget as f64 * (1.0 - DATA_CF_BUDGET_RATIO)).floor() as usize;
255+
assert!(
256+
memtables_budget > 0,
257+
"memory budget should be greater than 0"
258+
);
259+
set_memory_related_opts(opts, memtables_budget);
260+
//
261+
// Set compactions per level
262+
//
263+
opts.set_num_levels(3);
264+
opts.set_compression_per_level(&[
265+
DBCompressionType::None,
266+
DBCompressionType::None,
267+
DBCompressionType::Zstd,
268+
]);
269+
opts.set_memtable_whole_key_filtering(true);
270+
opts.set_max_write_buffer_number(4);
271+
opts.set_max_successive_merges(10);
272+
// Merge operator for log state updates
273+
opts.set_merge_operator(
274+
"LogStateMerge",
275+
log_state_full_merge,
276+
log_state_partial_merge,
277+
);
239278
}

crates/bifrost/src/providers/local_loglet/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ mod tests {
323323
.await;
324324

325325
let config = Live::from_value(Configuration::default());
326-
RocksDbManager::init(config.clone().map(|c| &c.common));
326+
RocksDbManager::init();
327327
let params = LogletParams::from("42".to_string());
328328

329329
let local_loglet_config = config.map(|config| &config.bifrost.local);
@@ -356,7 +356,7 @@ mod tests {
356356
.await;
357357

358358
let config = Live::from_value(Configuration::default());
359-
RocksDbManager::init(config.clone().map(|c| &c.common));
359+
RocksDbManager::init();
360360

361361
let local_loglet_config = config.map(|config| &config.bifrost.local);
362362
let log_store = RocksDbLogStore::create(local_loglet_config.clone()).await?;

crates/bifrost/src/providers/replicated_loglet/loglet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ mod tests {
442442
use restate_rocksdb::RocksDbManager;
443443
use restate_types::config::{Configuration, set_current_config};
444444
use restate_types::health::HealthStatus;
445-
use restate_types::live::{Live, LiveLoadExt};
445+
use restate_types::live::Live;
446446
use restate_types::logs::{Keys, LogletId};
447447
use restate_types::replication::{NodeSet, ReplicationProperty};
448448
use restate_types::{GenerationalNodeId, PlainNodeId};
@@ -467,7 +467,7 @@ mod tests {
467467
set_current_config(config.clone());
468468
let config = Live::from_value(config);
469469

470-
RocksDbManager::init(config.clone().map(|c| &c.common));
470+
RocksDbManager::init();
471471

472472
let mut node_env =
473473
TestCoreEnvBuilder::with_incoming_only_connector().add_mock_nodes_config();

0 commit comments

Comments
 (0)