Skip to content

Commit

Permalink
Add basic kRoundRobin compaction policy (#10107)
Browse files Browse the repository at this point in the history
Summary:
Add `kRoundRobin` as a compaction priority. The implementation is as follows.

- Define a cursor as the smallest Internal key in the successor of the selected file. Add `vector<InternalKey> compact_cursor_` into `VersionStorageInfo` where each element (`InternalKey`) in `compact_cursor_` represents a cursor. In round-robin compaction policy, we just need to select the first file (assuming files are sorted) and also has the smallest InternalKey larger than/equal to the cursor. After a file is chosen, we create a new `Fsize` vector which puts the selected file is placed at the first position in `temp`, the next cursor is then updated as the smallest InternalKey in successor of the selected file (the above logic is implemented in `SortFileByRoundRobin`).
- After a compaction succeeds, typically `InstallCompactionResults()`, we choose the next cursor for the input level and save it to `edit`. When calling `LogAndApply`, we save the next cursor with its level into some local variable and finally apply the change to `vstorage` in `SaveTo` function.
- Cursors are persist pair by pair (<level, InternalKey>) in `EncodeTo` so that they can be reconstructed when reopening. An empty cursor will not be encoded to MANIFEST

Pull Request resolved: facebook/rocksdb#10107

Test Plan: add unit test (`CompactionPriRoundRobin`) in `compaction_picker_test`, add `kRoundRobin` priority in `CompactionPriTest` from `db_compaction_test`, and add `PersistRoundRobinCompactCursor` in `db_compaction_test`

Reviewed By: ajkr

Differential Revision: D37316037

Pulled By: littlepig2013

fbshipit-source-id: 9f481748190ace416079139044e00df2968fb1ee
  • Loading branch information
zczhu authored and facebook-github-bot committed Jun 21, 2022
1 parent b012d23 commit 3014146
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 12 deletions.
10 changes: 10 additions & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,16 @@ Status CompactionJob::InstallCompactionResults(
stats.GetBytes());
}

if (compaction->compaction_reason() == CompactionReason::kLevelMaxLevelSize &&
compaction->immutable_options()->compaction_pri == kRoundRobin) {
int start_level = compaction->start_level();
if (start_level > 0) {
auto vstorage = compaction->input_version()->storage_info();
edit->AddCompactCursor(start_level,
vstorage->GetNextCompactCursor(start_level));
}
}

return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options, edit, db_mutex_,
db_directory_);
Expand Down
24 changes: 22 additions & 2 deletions db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,14 @@ bool LevelCompactionBuilder::PickFileToCompact() {
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
if (ioptions_.compaction_pri == kRoundRobin) {
// TODO(zichen): this file may be involved in one compaction from
// an upper level, cannot advance the cursor for round-robin policy.
// Currently, we do not pick any file to compact in this case. We
// should fix this later to ensure a compaction is picked but the
// cursor shall not be advanced.
return false;
}
continue;
}

Expand All @@ -460,6 +468,13 @@ bool LevelCompactionBuilder::PickFileToCompact() {
// A locked (pending compaction) input-level file was pulled in due to
// user-key overlap.
start_level_inputs_.clear();

// To ensure every file is selcted in a round-robin manner, we cannot
// skip the current file. So we return false and wait for the next time
// we can pick this file to compact
if (ioptions_.compaction_pri == kRoundRobin) {
return false;
}
continue;
}

Expand All @@ -479,15 +494,20 @@ bool LevelCompactionBuilder::PickFileToCompact() {
!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&output_level_inputs)) {
start_level_inputs_.clear();
// The same reason as above to ensure the round-robin compaction
if (ioptions_.compaction_pri == kRoundRobin) {
return false;
}
continue;
}
base_index_ = index;
break;
}

// store where to start the iteration in the next call to PickCompaction
vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);

if (ioptions_.compaction_pri != kRoundRobin) {
vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);
}
return start_level_inputs_.size() > 0;
}

Expand Down
38 changes: 38 additions & 0 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,44 @@ TEST_F(CompactionPickerTest, CompactionPriMinOverlapping4) {
ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber());
}

TEST_F(CompactionPickerTest, CompactionPriRoundRobin) {
std::vector<InternalKey> test_cursors = {InternalKey("249", 100, kTypeValue),
InternalKey("600", 100, kTypeValue),
InternalKey()};
std::vector<uint32_t> selected_files = {8U, 6U, 6U};

ioptions_.compaction_pri = kRoundRobin;
mutable_cf_options_.max_bytes_for_level_base = 10000000;
mutable_cf_options_.max_bytes_for_level_multiplier = 10;
for (size_t i = 0; i < test_cursors.size(); i++) {
// start a brand new version in each test.
NewVersionStorage(6, kCompactionStyleLevel);
vstorage_->ResizeCompactCursors(6);
// Set the cursor
vstorage_->AddCursorForOneLevel(2, test_cursors[i]);
Add(2, 6U, "150", "199", 50000000U); // Overlap with 26U, 27U
Add(2, 7U, "200", "249", 50000000U); // File not overlapping
Add(2, 8U, "300", "600", 50000000U); // Overlap with 28U, 29U

Add(3, 26U, "130", "165", 60000000U);
Add(3, 27U, "166", "170", 60000000U);
Add(3, 28U, "270", "340", 60000000U);
Add(3, 29U, "401", "500", 60000000U);
UpdateVersionStorageInfo();
LevelCompactionPicker local_level_compaction_picker =
LevelCompactionPicker(ioptions_, &icmp_);
std::unique_ptr<Compaction> compaction(
local_level_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction.get() != nullptr);
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(selected_files[i], compaction->input(0, 0)->fd.GetNumber());
// release the version storage
DeleteVersionStorage();
}
}

// This test exhibits the bug where we don't properly reset parent_index in
// PickCompaction()
TEST_F(CompactionPickerTest, ParentIndexResetBug) {
Expand Down
71 changes: 70 additions & 1 deletion db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5219,7 +5219,76 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(CompactionPri::kByCompensatedSize,
CompactionPri::kOldestLargestSeqFirst,
CompactionPri::kOldestSmallestSeqFirst,
CompactionPri::kMinOverlappingRatio));
CompactionPri::kMinOverlappingRatio,
CompactionPri::kRoundRobin));

TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) {
Options options = CurrentOptions();
options.write_buffer_size = 16 * 1024;
options.max_bytes_for_level_base = 64 * 1024;
options.level0_file_num_compaction_trigger = 4;
options.compaction_pri = CompactionPri::kRoundRobin;
options.max_bytes_for_level_multiplier = 4;
options.num_levels = 3;
options.compression = kNoCompression;

DestroyAndReopen(options);

Random rnd(301);

// 30 Files in L0 to trigger compactions between L1 and L2
for (int i = 0; i < 30; i++) {
for (int j = 0; j < 16; j++) {
ASSERT_OK(Put(rnd.RandomString(24), rnd.RandomString(1000)));
}
}

ASSERT_OK(dbfull()->TEST_WaitForCompact());

VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);

ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);

Version* const current = cfd->current();
ASSERT_NE(current, nullptr);

const VersionStorageInfo* const storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);

const std::vector<InternalKey> compact_cursors =
storage_info->GetCompactCursors();

Reopen(options);

VersionSet* const reopened_versions = dbfull()->GetVersionSet();
assert(reopened_versions);

ColumnFamilyData* const reopened_cfd =
reopened_versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(reopened_cfd, nullptr);

Version* const reopened_current = reopened_cfd->current();
ASSERT_NE(reopened_current, nullptr);

const VersionStorageInfo* const reopened_storage_info =
reopened_current->storage_info();
ASSERT_NE(reopened_storage_info, nullptr);

const std::vector<InternalKey> reopened_compact_cursors =
reopened_storage_info->GetCompactCursors();
const auto icmp = reopened_storage_info->InternalComparator();
ASSERT_EQ(compact_cursors.size(), reopened_compact_cursors.size());
for (size_t i = 0; i < compact_cursors.size(); i++) {
if (compact_cursors[i].Valid()) {
ASSERT_EQ(0,
icmp->Compare(compact_cursors[i], reopened_compact_cursors[i]));
} else {
ASSERT_TRUE(!reopened_compact_cursors[i].Valid());
}
}
}

class NoopMergeOperator : public MergeOperator {
public:
Expand Down
10 changes: 9 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3316,7 +3316,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
moved_bytes += f->fd.GetFileSize();
}
}

if (c->compaction_reason() == CompactionReason::kLevelMaxLevelSize &&
c->immutable_options()->compaction_pri == kRoundRobin) {
int start_level = c->start_level();
if (start_level > 0) {
auto vstorage = c->input_version()->storage_info();
c->edit()->AddCompactCursor(
start_level, vstorage->GetNextCompactCursor(start_level));
}
}
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
Expand Down
37 changes: 37 additions & 0 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ class VersionBuilder::Rep {
bool has_invalid_levels_;
// Current levels of table files affected by additions/deletions.
std::unordered_map<uint64_t, int> table_file_levels_;
// Current compact cursors that should be changed after the last compaction
std::unordered_map<int, InternalKey> updated_compact_cursors_;
NewestFirstBySeqNo level_zero_cmp_;
BySmallestKey level_nonzero_cmp_;

Expand Down Expand Up @@ -809,6 +811,22 @@ class VersionBuilder::Rep {
return Status::OK();
}

Status ApplyCompactCursors(int level,
const InternalKey& smallest_uncompacted_key) {
if (level < 0) {
std::ostringstream oss;
oss << "Cannot add compact cursor (" << level << ","
<< smallest_uncompacted_key.Encode().ToString()
<< " due to invalid level (level = " << level << ")";
return Status::Corruption("VersionBuilder", oss.str());
}
if (level < num_levels_) {
// Omit levels (>= num_levels_) when re-open with shrinking num_levels_
updated_compact_cursors_[level] = smallest_uncompacted_key;
}
return Status::OK();
}

// Apply all of the edits in *edit to the current state.
Status Apply(const VersionEdit* edit) {
{
Expand Down Expand Up @@ -860,6 +878,16 @@ class VersionBuilder::Rep {
}
}

// Populate compact cursors for round-robin compaction, leave
// the cursor to be empty to indicate it is invalid
for (const auto& cursor : edit->GetCompactCursors()) {
const int level = cursor.first;
const InternalKey smallest_uncompacted_key = cursor.second;
const Status s = ApplyCompactCursors(level, smallest_uncompacted_key);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}

Expand Down Expand Up @@ -1142,6 +1170,13 @@ class VersionBuilder::Rep {
}
}

void SaveCompactCursorsTo(VersionStorageInfo* vstorage) const {
for (auto iter = updated_compact_cursors_.begin();
iter != updated_compact_cursors_.end(); iter++) {
vstorage->AddCursorForOneLevel(iter->first, iter->second);
}
}

// Save the current state in *vstorage.
Status SaveTo(VersionStorageInfo* vstorage) const {
Status s;
Expand All @@ -1163,6 +1198,8 @@ class VersionBuilder::Rep {

SaveBlobFilesTo(vstorage);

SaveCompactCursorsTo(vstorage);

s = CheckConsistency(vstorage);
return s;
}
Expand Down
18 changes: 13 additions & 5 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void VersionEdit::Clear() {
has_max_column_family_ = false;
has_min_log_number_to_keep_ = false;
has_last_sequence_ = false;
compact_cursors_.clear();
deleted_files_.clear();
new_files_.clear();
blob_file_additions_.clear();
Expand Down Expand Up @@ -121,6 +122,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (has_last_sequence_) {
PutVarint32Varint64(dst, kLastSequence, last_sequence_);
}
for (size_t i = 0; i < compact_cursors_.size(); i++) {
if (compact_cursors_[i].second.Valid()) {
PutVarint32(dst, kCompactCursor);
PutVarint32(dst, compact_cursors_[i].first); // level
PutLengthPrefixedSlice(dst, compact_cursors_[i].second.Encode());
}
}
for (const auto& deleted : deleted_files_) {
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
deleted.second /* file number */);
Expand Down Expand Up @@ -512,15 +520,15 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;

case kCompactPointer:
case kCompactCursor:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
// we don't use compact pointers anymore,
// but we should not fail if they are still
// in manifest
// Here we re-use the output format of compact pointer in LevelDB
// to persist compact_cursors_
compact_cursors_.push_back(std::make_pair(level, key));
} else {
if (!msg) {
msg = "compaction pointer";
msg = "compaction cursor";
}
}
break;
Expand Down
23 changes: 22 additions & 1 deletion db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum Tag : uint32_t {
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kCompactCursor = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
Expand Down Expand Up @@ -463,6 +463,24 @@ class VersionEdit {
using NewFiles = std::vector<std::pair<int, FileMetaData>>;
const NewFiles& GetNewFiles() const { return new_files_; }

// Retrieve all the compact cursors
using CompactCursors = std::vector<std::pair<int, InternalKey>>;
const CompactCursors& GetCompactCursors() const { return compact_cursors_; }
void AddCompactCursor(int level, const InternalKey& cursor) {
compact_cursors_.push_back(std::make_pair(level, cursor));
}
void SetCompactCursors(
const std::vector<InternalKey>& compact_cursors_by_level) {
compact_cursors_.clear();
compact_cursors_.reserve(compact_cursors_by_level.size());
for (int i = 0; i < (int)compact_cursors_by_level.size(); i++) {
if (compact_cursors_by_level[i].Valid()) {
compact_cursors_.push_back(
std::make_pair(i, compact_cursors_by_level[i]));
}
}
}

// Add a new blob file.
void AddBlobFile(uint64_t blob_file_number, uint64_t total_blob_count,
uint64_t total_blob_bytes, std::string checksum_method,
Expand Down Expand Up @@ -635,6 +653,9 @@ class VersionEdit {
bool has_min_log_number_to_keep_ = false;
bool has_last_sequence_ = false;

// Compaction cursors for round-robin compaction policy
CompactCursors compact_cursors_;

DeletedFiles deleted_files_;
NewFiles new_files_;

Expand Down
Loading

0 comments on commit 3014146

Please sign in to comment.