Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenAndTrimHistory API to support trimming data with specified timestamp #9410

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Rename `SizeApproximationOptions.include_memtabtles` to `SizeApproximationOptions.include_memtables`.
* Remove deprecated option DBOptions::max_mem_compaction_level.
* Remove deprecated overloads of API DB::GetApproximateSizes.

* Add DB::OpenAndTrimHistory API. This API will open DB and trim data to the timestamp specified by trim_ts (The data with timestamp larger than specified trim bound will be removed). This API should only be used at a timestamp-enabled column families recovery. If the column family doesn't have timestamp enabled, this API won't trim any data on that column family. This API is not compatible with avoid_flush_during_recovery option.
### Behavior Changes
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.
* `ReadOptions::total_order_seek` no longer affects `DB::Get()`. The original motivation for this interaction has been obsolete since RocksDB has been able to detect whether the current prefix extractor is compatible with that used to generate table files, probably RocksDB 5.14.0.
Expand Down
4 changes: 2 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,12 @@ Compaction* ColumnFamilyData::CompactRange(
int output_level, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict,
uint64_t max_file_num_to_ignore) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, mutable_db_options,
current_->storage_info(), input_level, output_level,
compact_range_options, begin, end, compaction_end, conflict,
max_file_num_to_ignore);
max_file_num_to_ignore, trim_ts);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Expand Down
3 changes: 2 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ class ColumnFamilyData {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore,
const std::string& trim_ts);

CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
Expand Down
5 changes: 3 additions & 2 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ Compaction::Compaction(
uint32_t _output_path_id, CompressionType _compression,
CompressionOptions _compression_opts, Temperature _output_temperature,
uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, double _score, bool _deletion_compaction,
CompactionReason _compaction_reason)
bool _manual_compaction, const std::string& _trim_ts, double _score,
bool _deletion_compaction, CompactionReason _compaction_reason)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand All @@ -237,6 +237,7 @@ Compaction::Compaction(
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
trim_ts_(_trim_ts),
is_trivial_move_(false),
compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false) {
Expand Down
9 changes: 7 additions & 2 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class Compaction {
CompressionOptions compression_opts,
Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
CompactionReason compaction_reason = CompactionReason::kUnknown);

// No copying allowed
Expand Down Expand Up @@ -208,6 +208,8 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool is_manual_compaction() const { return is_manual_compaction_; }

std::string trim_ts() const { return trim_ts_; }

// Used when allow_trivial_move option is set in
// Universal compaction. If all the input files are
// non overlapping, then is_trivial_move_ variable
Expand Down Expand Up @@ -385,6 +387,9 @@ class Compaction {
// Is this compaction requested by the client?
const bool is_manual_compaction_;

// The data with timestamp > trim_ts_ will be removed
const std::string trim_ts_;
sunlike-Lipo marked this conversation as resolved.
Show resolved Hide resolved

// True if we can do trivial move in Universal multi level
// compaction
bool is_trivial_move_;
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "db/dbformat.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/history_trimming_iterator.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
Expand Down Expand Up @@ -425,7 +426,8 @@ CompactionJob::CompactionJob(
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback)
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
Expand Down Expand Up @@ -464,6 +466,7 @@ CompactionJob::CompactionJob(
measure_io_stats_(measure_io_stats),
thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)),
trim_ts_(std::move(trim_ts)),
blob_callback_(blob_callback) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
Expand Down Expand Up @@ -1275,6 +1278,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<InternalIterator> raw_input(
versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));
if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) {
raw_input.reset(new HistoryTrimmingIterator(
std::move(raw_input), cfd->user_comparator(), trim_ts_));
}
InternalIterator* input = raw_input.get();

IterKey start_ikey;
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CompactionJob {
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "",
std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr);

virtual ~CompactionJob();
Expand Down Expand Up @@ -216,6 +216,7 @@ class CompactionJob {
std::vector<uint64_t> sizes_;
Env::Priority thread_pri_;
std::string full_history_ts_low_;
std::string trim_ts_;
BlobFileCompletionCallback* blob_callback_;

uint64_t GetCompactionId(SubcompactionState* sub_compact);
Expand Down
7 changes: 3 additions & 4 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ Compaction* CompactionPicker::CompactRange(
int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const InternalKey* begin,
const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore) {
uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
// CompactionPickerFIFO has its own implementation of compact range
assert(ioptions_.compaction_style != kCompactionStyleFIFO);

Expand Down Expand Up @@ -641,8 +641,7 @@ Compaction* CompactionPicker::CompactRange(
output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
/* grandparents */ {},
/* is manual */ true);
/* grandparents */ {}, /* is manual */ true, trim_ts);
RegisterCompaction(c);
vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
return c;
Expand Down Expand Up @@ -821,7 +820,7 @@ Compaction* CompactionPicker::CompactRange(
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
std::move(grandparents),
/* is manual compaction */ true);
/* is manual compaction */ true, trim_ts);

TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
RegisterCompaction(compaction);
Expand Down
5 changes: 3 additions & 2 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore, const std::string& trim_ts);

// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
Expand Down Expand Up @@ -270,7 +270,8 @@ class NullCompactionPicker : public CompactionPicker {
const InternalKey* /*end*/,
InternalKey** /*compaction_end*/,
bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/) override {
uint64_t /*max_file_num_to_ignore*/,
const std::string& /*trim_ts*/) override {
return nullptr;
}

Expand Down
12 changes: 6 additions & 6 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}
Expand Down Expand Up @@ -157,8 +157,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
0 /* max_subcompactions */, {}, /* is manual */ false,
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
return c;
Expand Down Expand Up @@ -208,7 +208,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}
Expand Down Expand Up @@ -313,7 +313,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
mutable_cf_options.compression, mutable_cf_options.compression_opts,
Temperature::kWarm,
/* max_subcompactions */ 0, {}, /* is manual */ false,
/* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
/* is deletion compaction */ false, CompactionReason::kChangeTemperature);
return c;
Expand Down Expand Up @@ -349,7 +349,7 @@ Compaction* FIFOCompactionPicker::CompactRange(
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** compaction_end, bool* /*manual_conflict*/,
uint64_t /*max_file_num_to_ignore*/) {
uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) {
#ifdef NDEBUG
(void)input_level;
(void)output_level;
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_picker_fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FIFOCompactionPicker : public CompactionPicker {
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore) override;
uint64_t max_file_num_to_ignore, const std::string& trim_ts) override;

// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override { return 0; }
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
Temperature::kUnknown,
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);
/* trim_ts */ "", start_level_score_, false /* deletion_compaction */,
compaction_reason_);

// If it's level 0 compaction, make sure we don't execute any other level 0
// compactions in parallel
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2653,7 +2653,7 @@ TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) {
universal_compaction_picker.CompactRange(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
ColumnFamilyData::kCompactAllLevels, 6, CompactRangeOptions(), NULL,
NULL, &manual_end, &manual_conflict, port::kMaxUint64));
NULL, &manual_end, &manual_conflict, port::kMaxUint64, ""));

ASSERT_TRUE(compaction);

Expand Down
14 changes: 8 additions & 6 deletions db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,9 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
false /* deletion_compaction */, compaction_reason);
/* max_subcompactions */ 0, grandparents, /* is manual */ false,
/* trim_ts */ "", score_, false /* deletion_compaction */,
compaction_reason);
}

// Look at overall size amplification. If size amplification
Expand Down Expand Up @@ -1082,7 +1083,7 @@ Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
/* trim_ts */ "", score_, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}

Expand Down Expand Up @@ -1224,8 +1225,8 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
false /* deletion_compaction */,
/* max_subcompactions */ 0, grandparents, /* is manual */ false,
/* trim_ts */ "", score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
}

Expand Down Expand Up @@ -1300,7 +1301,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
/* trim_ts */ "", score_, false /* deletion_compaction */,
compaction_reason);
}

Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ class DBImpl : public DB {
const CompactRangeOptions& compact_range_options,
const Slice* begin, const Slice* end,
bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore);
uint64_t max_file_num_to_ignore,
const std::string& trim_ts);

// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
Expand Down Expand Up @@ -1244,7 +1245,8 @@ class DBImpl : public DB {

Status CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
const Slice* begin, const Slice* end,
const std::string& trim_ts);

// The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used.
Expand Down
Loading