Skip to content

Commit

Permalink
Add simple heuristics for experimental mempurge. (#8583)
Browse files Browse the repository at this point in the history
Summary:
Add `experimental_mempurge_policy` option flag and introduce two new `MemPurge` (Memtable Garbage Collection) policies: 'ALWAYS' and 'ALTERNATE'. Default value: ALTERNATE.
`ALWAYS`: every flush will first go through a `MemPurge` process. If the output is too big to fit into a single memtable, then the mempurge is aborted and a regular flush process carries on. `ALWAYS` is designed for user that need to reduce the number of L0 SST file created to a strict minimum, and can afford a small dent in performance (possibly hits to CPU usage, read efficiency, and maximum burst write throughput).
`ALTERNATE`: a flush is transformed into a `MemPurge` except if one of the memtables being flushed is the product of a previous `MemPurge`. `ALTERNATE` is a good tradeoff between reduction in number of L0 SST files created and performance. `ALTERNATE` perform particularly well for completely random garbage ratios, or garbage ratios anywhere in (0%,50%], and even higher when there is a wild variability in garbage ratios.
This PR also includes support for `experimental_mempurge_policy` in `db_bench`.
Testing was done locally by replacing all the `MemPurge` policies of the unit tests with `ALTERNATE`, as well as local testing with `db_crashtest.py` `whitebox` and `blackbox`. Overall, if an `ALWAYS` mempurge policy passes the tests, there is no reasons why an `ALTERNATE` policy would fail, and therefore the mempurge policy was set to `ALWAYS` for all mempurge unit tests.

Pull Request resolved: #8583

Reviewed By: pdillinger

Differential Revision: D29888050

Pulled By: bjlemaire

fbshipit-source-id: e2cf26646d66679f6f5fb29842624615610759c1
  • Loading branch information
bjlemaire authored and facebook-github-bot committed Jul 26, 2021
1 parent daf7e77 commit 4361d6d
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 14 deletions.
14 changes: 10 additions & 4 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
Expand Down Expand Up @@ -808,7 +809,7 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
// Assert that at least one flush to storage has been performed
ASSERT_GT(sst_count, EXPECTED_SST_COUNT);
// (which will consequently increase the number of mempurges recorded too).
ASSERT_EQ(mempurge_count, mempurge_count_record);
ASSERT_GE(mempurge_count, mempurge_count_record);

// Assert that there is no data corruption, even with
// a flush to storage.
Expand Down Expand Up @@ -842,6 +843,7 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));

uint32_t mempurge_count = 0;
Expand Down Expand Up @@ -1045,6 +1047,7 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));

uint32_t mempurge_count = 0;
Expand Down Expand Up @@ -1116,10 +1119,11 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
options.inplace_update_support = false;
options.allow_concurrent_memtable_write = true;

// Enforce size of a single MemTable to 1MB.
// Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
ASSERT_OK(TryReopen(options));

const size_t KVSIZE = 10;
Expand Down Expand Up @@ -1158,7 +1162,7 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
// more than would fit in maximum allowed memtables.
Random rnd(719);
const size_t NUM_REPEAT = 100;
const size_t RAND_KEY_LENGTH = 8192;
const size_t RAND_KEY_LENGTH = 4096;
const size_t RAND_VALUES_LENGTH = 1024;
std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);

Expand Down Expand Up @@ -1235,7 +1239,9 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
const uint32_t EXPECTED_SST_COUNT = 0;

EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
}

ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Check that there was no data corruption anywhere,
Expand Down
93 changes: 86 additions & 7 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ void FlushJob::PickMemTable() {
// path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);

// If mempurge feature is activated, keep track of any potential
// memtables coming from a previous mempurge operation.
// Used for mempurge policy.
if (db_options_.experimental_allow_mempurge) {
contains_mempurge_outcome_ = false;
for (MemTable* mt : mems_) {
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
contains_mempurge_outcome_ = true;
break;
}
}
}

base_ = cfd_->current();
base_->Ref(); // it is likely that we do not need this reference
}
Expand Down Expand Up @@ -230,7 +243,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
Status mempurge_s = Status::NotFound("No MemPurge.");
if (db_options_.experimental_allow_mempurge &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
(!mems_.empty())) {
(!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge();
if (!mempurge_s.ok()) {
// Mempurge is typically aborted when the new_mem output memtable
Expand Down Expand Up @@ -339,7 +352,15 @@ Status FlushJob::MemPurge() {
db_mutex_->Unlock();
assert(!mems_.empty());

// Measure purging time.
const uint64_t start_micros = clock_->NowMicros();
const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;

MemTable* new_mem = nullptr;
// For performance/log investigation purposes:
// look at how much useful payload we harvest in the new_mem.
// This value is then printed to the DB log.
double new_mem_capacity = 0.0;

// Create two iterators, one for the memtable data (contains
// info from puts + deletes), and one for the memtable
Expand Down Expand Up @@ -392,8 +413,8 @@ Status FlushJob::MemPurge() {
// or at least range tombstones, copy over the info
// to the new memtable.
if (iter->Valid() || !range_del_agg->IsEmpty()) {
// Arbitrary heuristic: maxSize is 60% cpacity.
size_t maxSize = ((mutable_cf_options_.write_buffer_size + 6U) / 10U);
// MaxSize is the size of a memtable.
size_t maxSize = mutable_cf_options_.write_buffer_size;
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions->compaction_filter_factory != nullptr &&
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
Expand Down Expand Up @@ -480,6 +501,7 @@ Status FlushJob::MemPurge() {
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted("Mempurge filled more than one memtable.");
new_mem_capacity = 1.0;
break;
}
}
Expand Down Expand Up @@ -524,6 +546,7 @@ Status FlushJob::MemPurge() {
// and destroy new_mem.
if (new_mem->ApproximateMemoryUsage() > maxSize) {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
new_mem_capacity = 1.0;
break;
}
}
Expand All @@ -538,19 +561,35 @@ Status FlushJob::MemPurge() {
new_mem->SetFirstSequenceNumber(new_first_seqno);

// The new_mem is added to the list of immutable memtables
// only if it filled at less than 60% capacity (arbitrary heuristic).
if (new_mem->ApproximateMemoryUsage() < maxSize) {
// only if it filled at less than 100% capacity and isn't flagged
// as in need of being flushed.
if (new_mem->ApproximateMemoryUsage() < maxSize &&
!(new_mem->ShouldFlushNow())) {
db_mutex_->Lock();
uint64_t new_mem_id = mems_[0]->GetID();
// Copy lowest memtable ID
// House keeping work.
for (MemTable* mt : mems_) {
new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id;
// Note: if m is not a previous mempurge output memtable,
// nothing happens.
cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
}
new_mem->SetID(new_mem_id);
cfd_->imm()->AddMemPurgeOutputID(new_mem_id);
cfd_->imm()->Add(new_mem,
&job_context_->memtables_to_free,
false /* -> trigger_flush=false:
* adding this memtable
* will not trigger a flush.
*/);
new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
mutable_cf_options_.write_buffer_size;
new_mem->Ref();
db_mutex_->Unlock();
} else {
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
new_mem_capacity = 1.0;
if (new_mem) {
job_context_->memtables_to_free.push_back(new_mem);
}
Expand All @@ -572,10 +611,32 @@ Status FlushJob::MemPurge() {
} else {
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
}
const uint64_t micros = clock_->NowMicros() - start_micros;
const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Mempurge lasted %" PRIu64
" microseconds, and %" PRIu64
" cpu "
"microseconds. Status is %s ok. Perc capacity: %f\n",
cfd_->GetName().c_str(), job_context_->job_id, micros,
cpu_micros, s.ok() ? "" : "not", new_mem_capacity);

return s;
}

bool FlushJob::MemPurgeDecider() {
MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
if (policy == MemPurgePolicy::kAlways) {
return true;
} else if (policy == MemPurgePolicy::kAlternate) {
// Note: if at least one of the flushed memtables is
// an output of a previous mempurge process, then flush
// to storage.
return !(contains_mempurge_outcome_);
}
return false;
}

Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0);
Expand Down Expand Up @@ -762,8 +823,16 @@ Status FlushJob::WriteLevel0Table() {

// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = clock_->NowMicros() - start_micros;
stats.cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
const uint64_t micros = clock_->NowMicros() - start_micros;
const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
stats.micros = micros;
stats.cpu_micros = cpu_micros;

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Flush lasted %" PRIu64
" microseconds, and %" PRIu64 " cpu microseconds.\n",
cfd_->GetName().c_str(), job_context_->job_id, micros,
cpu_micros);

if (has_output) {
stats.bytes_written = meta_.fd.GetFileSize();
Expand All @@ -777,12 +846,22 @@ Status FlushJob::WriteLevel0Table() {

stats.num_output_files_blob = static_cast<int>(blobs.size());

if (db_options_.experimental_allow_mempurge && s.ok()) {
// The db_mutex is held at this point.
for (MemTable* mt : mems_) {
// Note: if m is not a previous mempurge output memtable,
// nothing happens here.
cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
}
}

RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED,
stats.bytes_written + stats.bytes_written_blob);
RecordFlushIOStats();

return s;
}

Expand Down
4 changes: 4 additions & 0 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class FlushJob {
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
bool MemPurgeDecider();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE
Expand Down Expand Up @@ -190,6 +191,9 @@ class FlushJob {

const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;

// Used when experimental_allow_mempurge set to true.
bool contains_mempurge_outcome_;
};

} // namespace ROCKSDB_NAMESPACE
6 changes: 3 additions & 3 deletions db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ class MemTable {
}
#endif // !ROCKSDB_LITE

// Returns a heuristic flush decision
bool ShouldFlushNow();

private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };

Expand Down Expand Up @@ -558,9 +561,6 @@ class MemTable {
std::unique_ptr<FlushJobInfo> flush_job_info_;
#endif // !ROCKSDB_LITE

// Returns a heuristic flush decision
bool ShouldFlushNow();

// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();

Expand Down
22 changes: 22 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,24 @@ class MemTableList {
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
void AddMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
mempurged_ids_.insert(mid);
}
}

void RemoveMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
mempurged_ids_.erase(mid);
}
}

bool IsMemPurgeOutput(uint64_t mid) {
if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
return false;
}
return true;
}

private:
friend Status InstallMemtableAtomicFlushResults(
Expand Down Expand Up @@ -433,6 +451,10 @@ class MemTableList {

// Cached value of current_->HasHistory().
std::atomic<bool> current_has_history_;

// Store the IDs of the memtables installed in this
// list that result from a mempurge operation.
std::unordered_set<uint64_t> mempurged_ids_;
};

// Installs memtable atomic flush results.
Expand Down
10 changes: 10 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ struct DbPath {

extern const char* kHostnameForDbHostId;

enum class MemPurgePolicy : char {
kAlternate = 0x00,
kAlways = 0x01,
};

enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
Expand Down Expand Up @@ -785,6 +790,11 @@ struct DBOptions {
// If true, allows for memtable purge instead of flush to storage.
// (experimental).
bool experimental_allow_mempurge = false;
// If experimental_allow_mempurge is true, will dictate MemPurge
// policy.
// Default: kAlternate
// (experimental).
MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate;

// Amount of data to build up in memtables across all column
// families before writing to disk.
Expand Down
13 changes: 13 additions & 0 deletions options/db_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ static std::unordered_map<std::string, InfoLogLevel> info_log_level_string_map =
{"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL},
{"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}};

static std::unordered_map<std::string, MemPurgePolicy>
experimental_mempurge_policy_string_map = {
{"kAlternate", MemPurgePolicy::kAlternate},
{"kAlways", MemPurgePolicy::kAlways}};

static std::unordered_map<std::string, OptionTypeInfo>
db_mutable_options_type_info = {
{"allow_os_buffer",
Expand Down Expand Up @@ -196,6 +201,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"experimental_mempurge_policy",
OptionTypeInfo::Enum<MemPurgePolicy>(
offsetof(struct ImmutableDBOptions, experimental_mempurge_policy),
&experimental_mempurge_policy_string_map)},
{"is_fd_close_on_exec",
{offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
OptionType::kBoolean, OptionVerificationType::kNormal,
Expand Down Expand Up @@ -546,6 +555,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open),
experimental_allow_mempurge(options.experimental_allow_mempurge),
experimental_mempurge_policy(options.experimental_mempurge_policy),
db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
Expand Down Expand Up @@ -682,6 +692,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log,
" Options.experimental_allow_mempurge: %d",
experimental_allow_mempurge);
ROCKS_LOG_HEADER(log,
" Options.experimental_mempurge_policy: %d",
static_cast<int>(experimental_mempurge_policy));
ROCKS_LOG_HEADER(
log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
db_write_buffer_size);
Expand Down
1 change: 1 addition & 0 deletions options/db_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct ImmutableDBOptions {
bool is_fd_close_on_exec;
bool advise_random_on_open;
bool experimental_allow_mempurge;
MemPurgePolicy experimental_mempurge_policy;
size_t db_write_buffer_size;
std::shared_ptr<WriteBufferManager> write_buffer_manager;
DBOptions::AccessHint access_hint_on_compaction_start;
Expand Down
Loading

0 comments on commit 4361d6d

Please sign in to comment.