Skip to content

Commit

Permalink
Tiered compaction: integrate Seqno time mapping with per key placemen…
Browse files Browse the repository at this point in the history
…t (#10370)

Summary:
Using the Sequence number to time mapping to decide if a key is hot or not in
compaction and place it in the corresponding level.

Note: the feature is not complete, level compaction will run indefinitely until
all penultimate level data is cold and small enough to not trigger compaction.

Pull Request resolved: facebook/rocksdb#10370

Test Plan:
CI
* Run basic db_bench for universal compaction manually

Reviewed By: siying

Differential Revision: D37892338

Pulled By: jay-zhuang

fbshipit-source-id: 792bbd91b1ccc2f62b5d14c53118434bcaac4bbe
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Jul 16, 2022
1 parent 7506c1a commit faa0f97
Show file tree
Hide file tree
Showing 17 changed files with 330 additions and 80 deletions.
13 changes: 9 additions & 4 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,11 @@ bool Compaction::IsTrivialMove() const {
}
}

// PerKeyPlacement compaction should never be trivial move.
if (SupportsPerKeyPlacement()) {
return false;
}

return true;
}

Expand Down Expand Up @@ -741,10 +746,10 @@ int Compaction::EvaluatePenultimateLevel(
return kInvalidLevel;
}

// TODO: will add public like `options.preclude_last_level_data_seconds` for
// per_key_placement feature, will check that option here. Currently, only
// set by unittest
bool supports_per_key_placement = false;
bool supports_per_key_placement =
immutable_options.preclude_last_level_data_seconds > 0;

// it could be overridden by unittest
TEST_SYNC_POINT_CALLBACK("Compaction::SupportsPerKeyPlacement:Enabled",
&supports_per_key_placement);
if (!supports_per_key_placement) {
Expand Down
20 changes: 10 additions & 10 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low,
const SequenceNumber max_seqno_allow_zero_out)
const SequenceNumber penultimate_level_cutoff_seqno)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
Expand All @@ -44,7 +44,7 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, info_log, full_history_ts_low,
max_seqno_allow_zero_out) {}
penultimate_level_cutoff_seqno) {}

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
Expand All @@ -61,7 +61,7 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low,
const SequenceNumber max_seqno_allow_zero_out)
const SequenceNumber penultimate_level_cutoff_seqno)
: input_(input, cmp,
!compaction || compaction->DoesInputReferenceBlobFiles()),
cmp_(cmp),
Expand Down Expand Up @@ -96,7 +96,7 @@ CompactionIterator::CompactionIterator(
current_key_committed_(false),
cmp_with_history_ts_low_(0),
level_(compaction_ == nullptr ? 0 : compaction_->level()),
max_seqno_allow_zero_out_(max_seqno_allow_zero_out) {
penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) {
assert(snapshots_ != nullptr);
bottommost_level_ = compaction_ == nullptr
? false
Expand Down Expand Up @@ -1081,18 +1081,18 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {

void CompactionIterator::DecideOutputLevel() {
#ifndef NDEBUG
// TODO: will be set by sequence number or key range, for now, it will only be
// set by unittest
// Could be overridden by unittest
PerKeyPlacementContext context(level_, ikey_.user_key, value_,
ikey_.sequence);
TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
&context);
output_to_penultimate_level_ = context.output_to_penultimate_level;
#endif /* !NDEBUG */

// if the key is within the earliest snapshot, it has to output to the
// penultimate level.
if (ikey_.sequence > earliest_snapshot_) {
// if the key is newer than the cutoff sequence or within the earliest
// snapshot, it should output to the penultimate level.
if (ikey_.sequence > penultimate_level_cutoff_seqno_ ||
ikey_.sequence > earliest_snapshot_) {
output_to_penultimate_level_ = true;
}

Expand Down Expand Up @@ -1153,7 +1153,7 @@ void CompactionIterator::PrepareOutput() {
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_ &&
ikey_.sequence < max_seqno_allow_zero_out_) {
ikey_.sequence < penultimate_level_cutoff_seqno_) {
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL(
Expand Down
11 changes: 5 additions & 6 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class CompactionIterator {
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber);
const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber);

// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
Expand All @@ -214,7 +214,7 @@ class CompactionIterator {
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber);
const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber);

~CompactionIterator();

Expand Down Expand Up @@ -444,10 +444,9 @@ class CompactionIterator {
// output to.
bool output_to_penultimate_level_{false};

// any key later than this sequence number, need to keep the sequence number
// and not zeroed out. The sequence number is kept to track it's approximate
// time.
const SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber;
// any key later than this sequence number should have
// output_to_penultimate_level_ set to true
const SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber;

void AdvanceInputIter() { input_.Next(); }

Expand Down
7 changes: 4 additions & 3 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,9 @@ void CompactionJob::Prepare() {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time in compaction: Status: %s",
status.ToString().c_str());
max_seqno_allow_zero_out_ = 0;
penultimate_level_cutoff_seqno_ = 0;
} else {
max_seqno_allow_zero_out_ =
penultimate_level_cutoff_seqno_ =
seqno_time_mapping_.TruncateOldEntries(_current_time);
}
}
Expand Down Expand Up @@ -1026,7 +1026,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction, compaction_filter, shutting_down_,
db_options_.info_log, full_history_ts_low, max_seqno_allow_zero_out_);
db_options_.info_log, full_history_ts_low,
penultimate_level_cutoff_seqno_);
c_iter->SeekToFirst();

// Assign range delete aggregator to the target output level, which makes sure
Expand Down
9 changes: 6 additions & 3 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,12 @@ class CompactionJob {
// it also collects the smallest_seqno -> oldest_ancester_time from the SST.
SeqnoToTimeMapping seqno_time_mapping_;

// If a sequence number larger than max_seqno_allow_zero_out_, it won't be
// zeroed out. The sequence number is kept to get approximate time of the key.
SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber;
// cutoff sequence number for penultimate level, only set when
// per_key_placement feature is enabled.
// If a key with sequence number larger than penultimate_level_cutoff_seqno_,
// it will be placed on the penultimate_level and seqnuence number won't be
// zeroed out.
SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber;

// Get table file name in where it's outputting to, which should also be in
// `output_directory_`.
Expand Down
30 changes: 13 additions & 17 deletions db/compaction/tiered_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,26 +53,17 @@ class TieredCompactionTest : public DBTestBase {
InternalStats::CompactionOutputsStats kBasicPerLevelStats;
InternalStats::CompactionStats kBasicFlushStats;

std::atomic_bool enable_per_key_placement = true;

void SetUp() override {
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
auto supports_per_key_placement = static_cast<bool*>(arg);
*supports_per_key_placement = true;
*supports_per_key_placement = enable_per_key_placement;
});
SyncPoint::GetInstance()->EnableProcessing();
}

#ifndef ROCKSDB_LITE
uint64_t GetSstSizeHelper(Temperature temperature) {
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature +
std::to_string(static_cast<uint8_t>(temperature)),
&prop));
return static_cast<uint64_t>(std::atoi(prop.c_str()));
}
#endif // ROCKSDB_LITE

const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
Expand Down Expand Up @@ -1054,12 +1045,14 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);

latest_cold_seq = seq_history[2];

MoveFilesToLevel(kLastLevel);

// move forward the cold_seq again with range delete, take a snapshot to keep
// the range dels in bottommost
auto snap = db_->GetSnapshot();
latest_cold_seq = seq_history[2];

std::string start = Key(25), end = Key(35);
ASSERT_OK(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end));
Expand Down Expand Up @@ -1104,9 +1097,12 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {

db_->ReleaseSnapshot(snap);

// TODO: it should push the data to last level, but penultimate level file is
// already bottommost, it's a conflict between bottommost_temperature and
// tiered compaction which only applies to last level compaction.
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);

// 3 range dels dropped, the first one is double counted as expected, which is
Expand All @@ -1123,8 +1119,8 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) {
// input range
latest_cold_seq = seq_history[1];
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
}

Expand Down
11 changes: 0 additions & 11 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,6 @@ class DBCompactionTest : public DBTestBase {
: DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {}

protected:
#ifndef ROCKSDB_LITE
uint64_t GetSstSizeHelper(Temperature temperature) {
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature +
std::to_string(static_cast<uint8_t>(temperature)),
&prop));
return static_cast<uint64_t>(std::atoi(prop.c_str()));
}
#endif // ROCKSDB_LITE

/*
* Verifies compaction stats of cfd are valid.
*
Expand Down
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2594,6 +2594,8 @@ class DBImpl : public DB {
// Pointer to WriteBufferManager stalling interface.
std::unique_ptr<StallInterface> wbm_stall_;

// seqno_time_mapping_ stores the sequence number to time mapping, it's not
// thread safe, both read and write need db mutex hold.
SeqnoToTimeMapping seqno_time_mapping_;
};

Expand Down
12 changes: 0 additions & 12 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ namespace ROCKSDB_NAMESPACE {
class DBTest2 : public DBTestBase {
public:
DBTest2() : DBTestBase("db_test2", /*env_do_fsync=*/true) {}

protected:
#ifndef ROCKSDB_LITE
uint64_t GetSstSizeHelper(Temperature temperature) {
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature +
std::to_string(static_cast<uint8_t>(temperature)),
&prop));
return static_cast<uint64_t>(std::atoi(prop.c_str()));
}
#endif // ROCKSDB_LITE
};

#ifndef ROCKSDB_LITE
Expand Down
9 changes: 9 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,15 @@ uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
}
return result;
}

uint64_t DBTestBase::GetSstSizeHelper(Temperature temperature) {
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature +
std::to_string(static_cast<uint8_t>(temperature)),
&prop));
return static_cast<uint64_t>(std::atoi(prop.c_str()));
}
#endif // ROCKSDB_LITE

void VerifySstUniqueIds(const TablePropertiesCollection& props) {
Expand Down
2 changes: 2 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,8 @@ class DBTestBase : public testing::Test {
#ifndef ROCKSDB_LITE
uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
std::string column_family_name);

uint64_t GetSstSizeHelper(Temperature temperature);
#endif // ROCKSDB_LITE

uint64_t TestGetTickerCount(const Options& options, Tickers ticker_type) {
Expand Down
15 changes: 13 additions & 2 deletions db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,19 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
<< table_properties.fast_compression_estimated_data_size
<< "db_id" << table_properties.db_id << "db_session_id"
<< table_properties.db_session_id << "orig_file_number"
<< table_properties.orig_file_number << "seqno_to_time_mapping"
<< table_properties.seqno_to_time_mapping;
<< table_properties.orig_file_number << "seqno_to_time_mapping";

if (table_properties.seqno_to_time_mapping.empty()) {
jwriter << "N/A";
} else {
SeqnoToTimeMapping tmp;
Status status = tmp.Add(table_properties.seqno_to_time_mapping);
if (status.ok()) {
jwriter << tmp.ToHumanString();
} else {
jwriter << "Invalid";
}
}

// user collected properties
for (const auto& prop : table_properties.readable_properties) {
Expand Down
10 changes: 0 additions & 10 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,6 @@ class ExternalSSTFileBasicTest
std::string sst_files_dir_;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_test_env_;
bool random_rwfile_supported_;
#ifndef ROCKSDB_LITE
uint64_t GetSstSizeHelper(Temperature temperature) {
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature +
std::to_string(static_cast<uint8_t>(temperature)),
&prop));
return static_cast<uint64_t>(std::atoi(prop.c_str()));
}
#endif // ROCKSDB_LITE
};

TEST_F(ExternalSSTFileBasicTest, Basic) {
Expand Down
2 changes: 2 additions & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,8 @@ Status FlushJob::WriteLevel0Table() {

SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber();
if (!db_impl_seqno_time_mapping_.Empty()) {
// make a local copy, as the seqno_time_mapping from db_impl is not thread
// safe, which will be used while not holding the db_mutex.
seqno_to_time_mapping_ = db_impl_seqno_time_mapping_.Copy(smallest_seqno);
}

Expand Down
Loading

0 comments on commit faa0f97

Please sign in to comment.