diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 885da2138b..f35b2b5ca8 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -440,6 +440,11 @@ bool Compaction::IsTrivialMove() const { } } + // PerKeyPlacement compaction should never be trivial move. + if (SupportsPerKeyPlacement()) { + return false; + } + return true; } @@ -741,10 +746,10 @@ int Compaction::EvaluatePenultimateLevel( return kInvalidLevel; } - // TODO: will add public like `options.preclude_last_level_data_seconds` for - // per_key_placement feature, will check that option here. Currently, only - // set by unittest - bool supports_per_key_placement = false; + bool supports_per_key_placement = + immutable_options.preclude_last_level_data_seconds > 0; + + // it could be overridden by unittest TEST_SYNC_POINT_CALLBACK("Compaction::SupportsPerKeyPlacement:Enabled", &supports_per_key_placement); if (!supports_per_key_placement) { diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 620ae53f2b..98558d834b 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -34,7 +34,7 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, - const SequenceNumber max_seqno_allow_zero_out) + const SequenceNumber penultimate_level_cutoff_seqno) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, @@ -44,7 +44,7 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), compaction_filter, shutting_down, info_log, full_history_ts_low, - max_seqno_allow_zero_out) {} + penultimate_level_cutoff_seqno) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -61,7 +61,7 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, - const SequenceNumber max_seqno_allow_zero_out) + const SequenceNumber penultimate_level_cutoff_seqno) : input_(input, cmp, !compaction || compaction->DoesInputReferenceBlobFiles()), cmp_(cmp), @@ -96,7 +96,7 @@ CompactionIterator::CompactionIterator( current_key_committed_(false), cmp_with_history_ts_low_(0), level_(compaction_ == nullptr ? 0 : compaction_->level()), - max_seqno_allow_zero_out_(max_seqno_allow_zero_out) { + penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) { assert(snapshots_ != nullptr); bottommost_level_ = compaction_ == nullptr ? false @@ -1081,8 +1081,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { void CompactionIterator::DecideOutputLevel() { #ifndef NDEBUG - // TODO: will be set by sequence number or key range, for now, it will only be - // set by unittest + // Could be overridden by unittest PerKeyPlacementContext context(level_, ikey_.user_key, value_, ikey_.sequence); TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context", @@ -1090,9 +1089,10 @@ void CompactionIterator::DecideOutputLevel() { output_to_penultimate_level_ = context.output_to_penultimate_level; #endif /* !NDEBUG */ - // if the key is within the earliest snapshot, it has to output to the - // penultimate level. - if (ikey_.sequence > earliest_snapshot_) { + // if the key is newer than the cutoff sequence or within the earliest + // snapshot, it should output to the penultimate level. + if (ikey_.sequence > penultimate_level_cutoff_seqno_ || + ikey_.sequence > earliest_snapshot_) { output_to_penultimate_level_ = true; } @@ -1153,7 +1153,7 @@ void CompactionIterator::PrepareOutput() { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && !output_to_penultimate_level_ && - ikey_.sequence < max_seqno_allow_zero_out_) { + ikey_.sequence < penultimate_level_cutoff_seqno_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 0f2d9882d6..a94b0a8f3a 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -196,7 +196,7 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr, - const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber); + const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); // Constructor with custom CompactionProxy, used for tests. CompactionIterator( @@ -214,7 +214,7 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr, - const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber); + const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); ~CompactionIterator(); @@ -444,10 +444,9 @@ class CompactionIterator { // output to. bool output_to_penultimate_level_{false}; - // any key later than this sequence number, need to keep the sequence number - // and not zeroed out. The sequence number is kept to track it's approximate - // time. - const SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber; + // any key later than this sequence number should have + // output_to_penultimate_level_ set to true + const SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; void AdvanceInputIter() { input_.Next(); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index ad9413b858..e5334cb99d 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -282,9 +282,9 @@ void CompactionJob::Prepare() { ROCKS_LOG_WARN(db_options_.info_log, "Failed to get current time in compaction: Status: %s", status.ToString().c_str()); - max_seqno_allow_zero_out_ = 0; + penultimate_level_cutoff_seqno_ = 0; } else { - max_seqno_allow_zero_out_ = + penultimate_level_cutoff_seqno_ = seqno_time_mapping_.TruncateOldEntries(_current_time); } } @@ -1026,7 +1026,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, sub_compact->compaction, compaction_filter, shutting_down_, - db_options_.info_log, full_history_ts_low, max_seqno_allow_zero_out_); + db_options_.info_log, full_history_ts_low, + penultimate_level_cutoff_seqno_); c_iter->SeekToFirst(); // Assign range delete aggregator to the target output level, which makes sure diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 2dc2101985..2a342bddf3 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -304,9 +304,12 @@ class CompactionJob { // it also collects the smallest_seqno -> oldest_ancester_time from the SST. SeqnoToTimeMapping seqno_time_mapping_; - // If a sequence number larger than max_seqno_allow_zero_out_, it won't be - // zeroed out. The sequence number is kept to get approximate time of the key. - SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber; + // cutoff sequence number for penultimate level, only set when + // per_key_placement feature is enabled. + // If a key with sequence number larger than penultimate_level_cutoff_seqno_, + // it will be placed on the penultimate_level and seqnuence number won't be + // zeroed out. + SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; // Get table file name in where it's outputting to, which should also be in // `output_directory_`. diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index 1dc0fd2322..f0158e9eb0 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -53,26 +53,17 @@ class TieredCompactionTest : public DBTestBase { InternalStats::CompactionOutputsStats kBasicPerLevelStats; InternalStats::CompactionStats kBasicFlushStats; + std::atomic_bool enable_per_key_placement = true; + void SetUp() override { SyncPoint::GetInstance()->SetCallBack( "Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) { auto supports_per_key_placement = static_cast(arg); - *supports_per_key_placement = true; + *supports_per_key_placement = enable_per_key_placement; }); SyncPoint::GetInstance()->EnableProcessing(); } -#ifndef ROCKSDB_LITE - uint64_t GetSstSizeHelper(Temperature temperature) { - std::string prop; - EXPECT_TRUE(dbfull()->GetProperty( - DB::Properties::kLiveSstFilesSizeAtTemperature + - std::to_string(static_cast(temperature)), - &prop)); - return static_cast(std::atoi(prop.c_str())); - } -#endif // ROCKSDB_LITE - const std::vector& GetCompactionStats() { VersionSet* const versions = dbfull()->GetVersionSet(); assert(versions); @@ -1054,12 +1045,14 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) { ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + latest_cold_seq = seq_history[2]; + MoveFilesToLevel(kLastLevel); // move forward the cold_seq again with range delete, take a snapshot to keep // the range dels in bottommost auto snap = db_->GetSnapshot(); - latest_cold_seq = seq_history[2]; + std::string start = Key(25), end = Key(35); ASSERT_OK( db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), start, end)); @@ -1104,9 +1097,12 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) { db_->ReleaseSnapshot(snap); + // TODO: it should push the data to last level, but penultimate level file is + // already bottommost, it's a conflict between bottommost_temperature and + // tiered compaction which only applies to last level compaction. ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); - ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); - ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); // 3 range dels dropped, the first one is double counted as expected, which is @@ -1123,8 +1119,8 @@ TEST_F(TieredCompactionTest, SequenceBasedTieredStorageLevel) { // input range latest_cold_seq = seq_history[1]; ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); - ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 1d615a4252..93362b3231 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -78,17 +78,6 @@ class DBCompactionTest : public DBTestBase { : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {} protected: -#ifndef ROCKSDB_LITE - uint64_t GetSstSizeHelper(Temperature temperature) { - std::string prop; - EXPECT_TRUE(dbfull()->GetProperty( - DB::Properties::kLiveSstFilesSizeAtTemperature + - std::to_string(static_cast(temperature)), - &prop)); - return static_cast(std::atoi(prop.c_str())); - } -#endif // ROCKSDB_LITE - /* * Verifies compaction stats of cfd are valid. * diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 711ebfe040..9315b3737f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2594,6 +2594,8 @@ class DBImpl : public DB { // Pointer to WriteBufferManager stalling interface. std::unique_ptr wbm_stall_; + // seqno_time_mapping_ stores the sequence number to time mapping, it's not + // thread safe, both read and write need db mutex hold. SeqnoToTimeMapping seqno_time_mapping_; }; diff --git a/db/db_test2.cc b/db/db_test2.cc index 6e121f3621..345e40db16 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -33,18 +33,6 @@ namespace ROCKSDB_NAMESPACE { class DBTest2 : public DBTestBase { public: DBTest2() : DBTestBase("db_test2", /*env_do_fsync=*/true) {} - - protected: -#ifndef ROCKSDB_LITE - uint64_t GetSstSizeHelper(Temperature temperature) { - std::string prop; - EXPECT_TRUE(dbfull()->GetProperty( - DB::Properties::kLiveSstFilesSizeAtTemperature + - std::to_string(static_cast(temperature)), - &prop)); - return static_cast(std::atoi(prop.c_str())); - } -#endif // ROCKSDB_LITE }; #ifndef ROCKSDB_LITE diff --git a/db/db_test_util.cc b/db/db_test_util.cc index b2ae432574..2c44e1e097 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1676,6 +1676,15 @@ uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily( } return result; } + +uint64_t DBTestBase::GetSstSizeHelper(Temperature temperature) { + std::string prop; + EXPECT_TRUE(dbfull()->GetProperty( + DB::Properties::kLiveSstFilesSizeAtTemperature + + std::to_string(static_cast(temperature)), + &prop)); + return static_cast(std::atoi(prop.c_str())); +} #endif // ROCKSDB_LITE void VerifySstUniqueIds(const TablePropertiesCollection& props) { diff --git a/db/db_test_util.h b/db/db_test_util.h index 3a9592b54b..0a35d9ffcd 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1345,6 +1345,8 @@ class DBTestBase : public testing::Test { #ifndef ROCKSDB_LITE uint64_t GetNumberOfSstFilesForColumnFamily(DB* db, std::string column_family_name); + + uint64_t GetSstSizeHelper(Temperature temperature); #endif // ROCKSDB_LITE uint64_t TestGetTickerCount(const Options& options, Tickers ticker_type) { diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 8883183c33..3f290cc056 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -148,8 +148,19 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( << table_properties.fast_compression_estimated_data_size << "db_id" << table_properties.db_id << "db_session_id" << table_properties.db_session_id << "orig_file_number" - << table_properties.orig_file_number << "seqno_to_time_mapping" - << table_properties.seqno_to_time_mapping; + << table_properties.orig_file_number << "seqno_to_time_mapping"; + + if (table_properties.seqno_to_time_mapping.empty()) { + jwriter << "N/A"; + } else { + SeqnoToTimeMapping tmp; + Status status = tmp.Add(table_properties.seqno_to_time_mapping); + if (status.ok()) { + jwriter << tmp.ToHumanString(); + } else { + jwriter << "Invalid"; + } + } // user collected properties for (const auto& prop : table_properties.readable_properties) { diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 2b698814f6..30f1bf1339 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -187,16 +187,6 @@ class ExternalSSTFileBasicTest std::string sst_files_dir_; std::unique_ptr fault_injection_test_env_; bool random_rwfile_supported_; -#ifndef ROCKSDB_LITE - uint64_t GetSstSizeHelper(Temperature temperature) { - std::string prop; - EXPECT_TRUE(dbfull()->GetProperty( - DB::Properties::kLiveSstFilesSizeAtTemperature + - std::to_string(static_cast(temperature)), - &prop)); - return static_cast(std::atoi(prop.c_str())); - } -#endif // ROCKSDB_LITE }; TEST_F(ExternalSSTFileBasicTest, Basic) { diff --git a/db/flush_job.cc b/db/flush_job.cc index f63c090083..19da5369dc 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -818,6 +818,8 @@ Status FlushJob::WriteLevel0Table() { SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber(); if (!db_impl_seqno_time_mapping_.Empty()) { + // make a local copy, as the seqno_time_mapping from db_impl is not thread + // safe, which will be used while not holding the db_mutex. seqno_to_time_mapping_ = db_impl_seqno_time_mapping_.Copy(smallest_seqno); } diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index fa24c14ab4..48fa4ae83d 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -8,6 +8,7 @@ #include "db/periodic_work_scheduler.h" #include "db/seqno_to_time_mapping.h" #include "port/stack_trace.h" +#include "rocksdb/iostats_context.h" #include "test_util/mock_time_env.h" #ifndef ROCKSDB_LITE @@ -35,8 +36,253 @@ class SeqnoTimeTest : public DBTestBase { PeriodicWorkTestScheduler::Default(mock_clock_); }); } + + // make sure the file is not in cache, otherwise it won't have IO info + void AssertKetTemperature(int key_id, Temperature expected_temperature) { + get_iostats_context()->Reset(); + IOStatsContext* iostats = get_iostats_context(); + std::string result = Get(Key(key_id)); + ASSERT_FALSE(result.empty()); + ASSERT_GT(iostats->bytes_read, 0); + switch (expected_temperature) { + case Temperature::kUnknown: + ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_read_count, + 0); + ASSERT_EQ(iostats->file_io_stats_by_temperature.cold_file_bytes_read, + 0); + break; + case Temperature::kCold: + ASSERT_GT(iostats->file_io_stats_by_temperature.cold_file_read_count, + 0); + ASSERT_GT(iostats->file_io_stats_by_temperature.cold_file_bytes_read, + 0); + break; + default: + // the test only support kCold now for the bottommost temperature + FAIL(); + } + } }; +TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + options.bottommost_temperature = Temperature::kCold; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // All data is hot, only output to penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // read a random key, which should be hot (kUnknown) + AssertKetTemperature(20, Temperature::kUnknown); + + // Write more data, but still all hot until the 10th SST, as: + // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds + // The preclude_last_level_data_seconds is 10k + for (; sst_num < kNumTrigger * 2; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->WaitForCompact(true)); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + } + + // Now we have both hot data and cold data + for (; sst_num < kNumTrigger * 3; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->WaitForCompact(true)); + } + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + uint64_t hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + uint64_t cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_GT(hot_data_size, 0); + ASSERT_GT(cold_data_size, 0); + // the first a few key should be cold + AssertKetTemperature(20, Temperature::kCold); + + // Wait some time, each time after compaction, the cold data size is + // increasing and hot data size is decreasing + for (int i = 0; i < 30; i++) { + dbfull()->TEST_WaitForPeridicWorkerRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(20 * kKeyPerSec)); + }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + uint64_t pre_hot = hot_data_size; + uint64_t pre_cold = cold_data_size; + hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_LT(hot_data_size, pre_hot); + ASSERT_GT(cold_data_size, pre_cold); + + // the hot/cold data cut off range should be between i * 20 + 200 -> 250 + AssertKetTemperature(i * 20 + 250, Temperature::kUnknown); + AssertKetTemperature(i * 20 + 200, Temperature::kCold); + } + + // Wait again, all data should be cold after that + for (int i = 0; i < 5; i++) { + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // any random data should be cold + AssertKetTemperature(1000, Temperature::kCold); + + // close explicitly, because the env is local variable which will be released + // first. + Close(); +} + +TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { + const int kNumLevels = 7; + const int kNumKeys = 100; + + Options options = CurrentOptions(); + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + options.bottommost_temperature = Temperature::kCold; + options.num_levels = kNumLevels; + options.level_compaction_dynamic_level_bytes = true; + // TODO(zjay): for level compaction, auto-compaction may stuck in deadloop, if + // the penultimate level score > 1, but the hot is not cold enough to compact + // to last level, which will keep triggering compaction. + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + + int sst_num = 0; + // Write files that are overlap + for (; sst_num < 4; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + } + ASSERT_OK(Flush()); + } + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // All data is hot, only output to penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + + // read a random key, which should be hot (kUnknown) + AssertKetTemperature(20, Temperature::kUnknown); + + // Adding more data to have mixed hot and cold data + for (; sst_num < 14; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + } + ASSERT_OK(Flush()); + } + // TODO(zjay): all data become cold because of level 5 (penultimate level) is + // the bottommost level, which converts the data to cold. PerKeyPlacement is + // for the last level (level 6). Will be fixed by change the + // bottommost_temperature to the last_level_temperature + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // Compact the files to the last level which should split the hot/cold data + MoveFilesToLevel(6); + uint64_t hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + uint64_t cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_GT(hot_data_size, 0); + ASSERT_GT(cold_data_size, 0); + // the first a few key should be cold + AssertKetTemperature(20, Temperature::kCold); + + // Wait some time, each it wait, the cold data is increasing and hot data is + // decreasing + for (int i = 0; i < 30; i++) { + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + uint64_t pre_hot = hot_data_size; + uint64_t pre_cold = cold_data_size; + hot_data_size = GetSstSizeHelper(Temperature::kUnknown); + cold_data_size = GetSstSizeHelper(Temperature::kCold); + ASSERT_LT(hot_data_size, pre_hot); + ASSERT_GT(cold_data_size, pre_cold); + + // the hot/cold cut_off key should be around i * 20 + 400 -> 450 + AssertKetTemperature(i * 20 + 450, Temperature::kUnknown); + AssertKetTemperature(i * 20 + 400, Temperature::kCold); + } + + // Wait again, all data should be cold after that + for (int i = 0; i < 5; i++) { + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + } + + ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + + // any random data should be cold + AssertKetTemperature(1000, Temperature::kCold); + + Close(); +} + TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { Options options = CurrentOptions(); options.preclude_last_level_data_seconds = 10000; diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index 6b53fe161d..a9d3dc36c0 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -40,7 +40,7 @@ SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { const uint64_t cut_off_time = now > max_time_duration_ ? now - max_time_duration_ : 0; - assert(cut_off_time < now); // no overflow + assert(cut_off_time <= now); // no overflow auto it = std::upper_bound( seqno_time_mapping_.begin(), seqno_time_mapping_.end(), cut_off_time, @@ -238,7 +238,11 @@ bool SeqnoToTimeMapping::Resize(uint64_t min_time_duration, } Status SeqnoToTimeMapping::Sort() { - if (is_sorted_ || seqno_time_mapping_.empty()) { + if (is_sorted_) { + return Status::OK(); + } + if (seqno_time_mapping_.empty()) { + is_sorted_ = true; return Status::OK(); } diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index 6f5476b340..ad32a65170 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -29,6 +29,9 @@ constexpr uint64_t kUnknownSeqnoTime = 0; // would be 300. // As it's a sorted list, the new entry is inserted from the back. The old data // will be popped from the front if they're no longer used. +// +// Note: the data struct is not thread safe, both read and write need to be +// synchronized by caller. class SeqnoToTimeMapping { public: // Maximum number of entries can be encoded into SST. The data is delta encode