Skip to content

Commit

Permalink
Add seqno to time mapping (#10338)
Browse files Browse the repository at this point in the history
Summary:
Which will be used for tiered storage to preclude hot data from
compacting to the cold tier (the last level).
Internally, adding seqno to time mapping. A periodic_task is scheduled
to record the current_seqno -> current_time in certain cadence. When
memtable flush, the mapping informaiton is stored in sstable property.
During compaction, the mapping information are merged and get the
approximate time of sequence number, which is used to determine if a key
is recently inserted or not and preclude it from the last level if it's
recently inserted (within the `preclude_last_level_data_seconds`).

Pull Request resolved: facebook/rocksdb#10338

Test Plan: CI

Reviewed By: siying

Differential Revision: D37810187

Pulled By: jay-zhuang

fbshipit-source-id: 6953be7a18a99de8b1cb3b162d712f79c2b4899f
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Jul 15, 2022
1 parent 66685d6 commit a3acf2e
Show file tree
Hide file tree
Showing 50 changed files with 1,639 additions and 157 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ set(SOURCES
db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc
db/repair.cc
db/seqno_to_time_mapping.cc
db/snapshot_impl.cc
db/table_cache.cc
db/table_properties_collector.cc
Expand Down Expand Up @@ -1296,6 +1297,7 @@ if(WITH_TESTS)
db/perf_context_test.cc
db/periodic_work_scheduler_test.cc
db/plain_table_db_test.cc
db/seqno_time_test.cc
db/prefix_test.cc
db/range_del_aggregator_test.cc
db/range_tombstone_fragmenter_test.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* User can configure the new ColumnFamilyOptions `blob_cache` to enable/disable blob caching.
* Either sharing the backend cache with the block cache or using a completely separate cache is supported.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Add experimental tiered compaction feature `AdvancedColumnFamilyOptions::preclude_last_level_data_seconds`, which makes sure the new data inserted within preclude_last_level_data_seconds won't be placed on cold tier (the feature is not complete).

### Public API changes
* Add metadata related structs and functions in C API, including
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,9 @@ db_table_properties_test: $(OBJ_DIR)/db/db_table_properties_test.o $(TEST_LIBRAR
log_write_bench: $(OBJ_DIR)/util/log_write_bench.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(PROFILING_FLAGS)

seqno_time_test: $(OBJ_DIR)/db/seqno_time_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

plain_table_db_test: $(OBJ_DIR)/db/plain_table_db_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
"db/seqno_to_time_mapping.cc",
"db/snapshot_impl.cc",
"db/table_cache.cc",
"db/table_properties_collector.cc",
Expand Down Expand Up @@ -419,6 +420,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
"db/seqno_to_time_mapping.cc",
"db/snapshot_impl.cc",
"db/table_cache.cc",
"db/table_properties_collector.cc",
Expand Down Expand Up @@ -5688,6 +5690,12 @@ cpp_unittest_wrapper(name="ribbon_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="seqno_time_test",
srcs=["db/seqno_time_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="sim_cache_test",
srcs=["utilities/simulator_cache/sim_cache_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
12 changes: 11 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Status BuildTable(
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason, EventLogger* event_logger,
BlobFileCreationReason blob_creation_reason,
const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger,
int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, Env::WriteLifeTimeHint write_hint,
const std::string* full_history_ts_low,
Expand Down Expand Up @@ -260,6 +261,15 @@ Status BuildTable(
if (!s.ok() || empty) {
builder->Abandon();
} else {
std::string seqno_time_mapping_str;
seqno_to_time_mapping.Encode(
seqno_time_mapping_str, meta->fd.smallest_seqno,
meta->fd.largest_seqno, meta->file_creation_time);
builder->SetSeqnoTimeTableProperties(
seqno_time_mapping_str,
ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO
? meta->file_creation_time
: meta->oldest_ancester_time);
s = builder->Finish();
}
if (io_status->ok()) {
Expand Down
3 changes: 3 additions & 0 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
#include <string>
#include <utility>
#include <vector>

#include "db/range_tombstone_fragmenter.h"
#include "db/seqno_to_time_mapping.h"
#include "db/table_properties_collector.h"
#include "logging/event_logger.h"
#include "options/cf_options.h"
Expand Down Expand Up @@ -61,6 +63,7 @@ extern Status BuildTable(
bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason,
const SeqnoToTimeMapping& seqno_to_time_mapping,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr,
Expand Down
2 changes: 2 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ class ColumnFamilyData {
return file_metadata_cache_res_mgr_;
}

SequenceNumber GetFirstMemtableSequenceNumber() const;

static const uint32_t kDummyColumnFamilyDataId;

// Keep track of whether the mempurge feature was ever used.
Expand Down
15 changes: 10 additions & 5 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ CompactionIterator::CompactionIterator(
const Compaction* compaction, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
const std::string* full_history_ts_low,
const SequenceNumber max_seqno_allow_zero_out)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
Expand All @@ -42,7 +43,8 @@ CompactionIterator::CompactionIterator(
manual_compaction_canceled,
std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, info_log, full_history_ts_low) {}
compaction_filter, shutting_down, info_log, full_history_ts_low,
max_seqno_allow_zero_out) {}

CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
Expand All @@ -58,7 +60,8 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low)
const std::string* full_history_ts_low,
const SequenceNumber max_seqno_allow_zero_out)
: input_(input, cmp,
!compaction || compaction->DoesInputReferenceBlobFiles()),
cmp_(cmp),
Expand Down Expand Up @@ -92,7 +95,8 @@ CompactionIterator::CompactionIterator(
CreatePrefetchBufferCollectionIfNeeded(compaction_.get())),
current_key_committed_(false),
cmp_with_history_ts_low_(0),
level_(compaction_ == nullptr ? 0 : compaction_->level()) {
level_(compaction_ == nullptr ? 0 : compaction_->level()),
max_seqno_allow_zero_out_(max_seqno_allow_zero_out) {
assert(snapshots_ != nullptr);
bottommost_level_ = compaction_ == nullptr
? false
Expand Down Expand Up @@ -1148,7 +1152,8 @@ void CompactionIterator::PrepareOutput() {
!compaction_->allow_ingest_behind() && bottommost_level_ &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_) {
!output_to_penultimate_level_ &&
ikey_.sequence < max_seqno_allow_zero_out_) {
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL(
Expand Down
71 changes: 37 additions & 34 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,42 +181,40 @@ class CompactionIterator {
const Compaction* compaction_;
};

CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber);

// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder,
bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr);
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
Env* env, bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
bool enforce_single_del_contracts,
const std::atomic<bool>& manual_compaction_canceled,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr,
const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber);

~CompactionIterator();

Expand Down Expand Up @@ -446,6 +444,11 @@ class CompactionIterator {
// output to.
bool output_to_penultimate_level_{false};

// any key later than this sequence number, need to keep the sequence number
// and not zeroed out. The sequence number is kept to track it's approximate
// time.
const SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber;

void AdvanceInputIter() { input_.Next(); }

void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
Expand Down
54 changes: 45 additions & 9 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ void CompactionJob::Prepare() {

// Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
ColumnFamilyData* cfd = c->column_family_data();
assert(cfd != nullptr);
assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);

write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();

if (c->ShouldFormSubcompactions()) {
Expand All @@ -251,6 +251,43 @@ void CompactionJob::Prepare() {

compact_->sub_compact_states.emplace_back(c, start, end, /*sub_job_id*/ 0);
}

if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
// TODO(zjay): move to a function
seqno_time_mapping_.SetMaxTimeDuration(
c->immutable_options()->preclude_last_level_data_seconds);
// setup seqno_time_mapping_
for (const auto& each_level : *c->inputs()) {
for (const auto& fmd : each_level.files) {
std::shared_ptr<const TableProperties> tp;
Status s = cfd->current()->GetTableProperties(&tp, fmd, nullptr);
if (s.ok()) {
seqno_time_mapping_.Add(tp->seqno_to_time_mapping)
.PermitUncheckedError();
seqno_time_mapping_.Add(fmd->fd.smallest_seqno,
fmd->oldest_ancester_time);
}
}
}

auto status = seqno_time_mapping_.Sort();
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Invalid sequence number to time mapping: Status: %s",
status.ToString().c_str());
}
int64_t _current_time = 0;
status = db_options_.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time in compaction: Status: %s",
status.ToString().c_str());
max_seqno_allow_zero_out_ = 0;
} else {
max_seqno_allow_zero_out_ =
seqno_time_mapping_.TruncateOldEntries(_current_time);
}
}
}

struct RangeWithSize {
Expand Down Expand Up @@ -989,7 +1026,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction, compaction_filter, shutting_down_,
db_options_.info_log, full_history_ts_low);
db_options_.info_log, full_history_ts_low, max_seqno_allow_zero_out_);
c_iter->SeekToFirst();

// Assign range delete aggregator to the target output level, which makes sure
Expand Down Expand Up @@ -1253,7 +1290,7 @@ Status CompactionJob::FinishCompactionOutputFile(

const uint64_t current_entries = outputs.NumEntries();

s = outputs.Finish(s);
s = outputs.Finish(s, seqno_time_mapping_);

if (s.ok()) {
// With accurate smallest and largest key, we can get a slightly more
Expand Down Expand Up @@ -1617,9 +1654,8 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
sub_compact->compaction->output_compression_opts(), cfd->GetID(),
cfd->GetName(), sub_compact->compaction->output_level(),
bottommost_level_, TableFileCreationReason::kCompaction,
oldest_ancester_time, 0 /* oldest_key_time */, current_time, db_id_,
db_session_id_, sub_compact->compaction->max_output_file_size(),
file_number);
0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
sub_compact->compaction->max_output_file_size(), file_number);

outputs.NewBuilder(tboptions);

Expand Down
9 changes: 9 additions & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/range_del_aggregator.h"
#include "db/seqno_to_time_mapping.h"
#include "db/version_edit.h"
#include "db/write_controller.h"
#include "db/write_thread.h"
Expand Down Expand Up @@ -299,6 +300,14 @@ class CompactionJob {

uint64_t GetCompactionId(SubcompactionState* sub_compact) const;

// Stores the sequence number to time mapping gathered from all input files
// it also collects the smallest_seqno -> oldest_ancester_time from the SST.
SeqnoToTimeMapping seqno_time_mapping_;

// If a sequence number larger than max_seqno_allow_zero_out_, it won't be
// zeroed out. The sequence number is kept to get approximate time of the key.
SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber;

// Get table file name in where it's outputting to, which should also be in
// `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number);
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) {
builder_.reset(NewTableBuilder(tboptions, file_writer_.get()));
}

Status CompactionOutputs::Finish(const Status& intput_status) {
Status CompactionOutputs::Finish(const Status& intput_status,
const SeqnoToTimeMapping& seqno_time_mapping) {
FileMetaData* meta = GetMetaData();
assert(meta != nullptr);
Status s = intput_status;
if (s.ok()) {
std::string seqno_time_mapping_str;
seqno_time_mapping.Encode(seqno_time_mapping_str, meta->fd.smallest_seqno,
meta->fd.largest_seqno, meta->file_creation_time);
builder_->SetSeqnoTimeTableProperties(seqno_time_mapping_str,
meta->oldest_ancester_time);
s = builder_->Finish();

} else {
builder_->Abandon();
}
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ class CompactionOutputs {
}

// Finish the current output file
Status Finish(const Status& intput_status);
Status Finish(const Status& intput_status,
const SeqnoToTimeMapping& seqno_time_mapping);

// Update output table properties from table builder
void UpdateTableProperties() {
Expand Down
Loading

0 comments on commit a3acf2e

Please sign in to comment.