Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenAndTrimHistory API to support trimming data with specified timestamp #9410

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ Compaction::Compaction(
uint32_t _output_path_id, CompressionType _compression,
CompressionOptions _compression_opts, Temperature _output_temperature,
uint32_t _max_subcompactions, std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, double _score, bool _deletion_compaction,
CompactionReason _compaction_reason)
bool _manual_compaction, const std::string& _trim_ts, double _score,
bool _deletion_compaction, CompactionReason _compaction_reason)
: input_vstorage_(vstorage),
start_level_(_inputs[0].level),
output_level_(_output_level),
Expand All @@ -237,6 +237,7 @@ Compaction::Compaction(
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
trim_ts_(_trim_ts),
is_trivial_move_(false),
compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false) {
Expand Down
8 changes: 6 additions & 2 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class Compaction {
CompressionOptions compression_opts,
Temperature output_temperature, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false,
bool manual_compaction = false, const std::string& trim_ts = "",
double score = -1, bool deletion_compaction = false,
CompactionReason compaction_reason = CompactionReason::kUnknown);

// No copying allowed
Expand Down Expand Up @@ -208,6 +208,8 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool is_manual_compaction() const { return is_manual_compaction_; }

std::string trim_ts() const { return trim_ts_; }

// Used when allow_trivial_move option is set in
// Universal compaction. If all the input files are
// non overlapping, then is_trivial_move_ variable
Expand Down Expand Up @@ -385,6 +387,8 @@ class Compaction {
// Is this compaction requested by the client?
const bool is_manual_compaction_;

const std::string trim_ts_;
sunlike-Lipo marked this conversation as resolved.
Show resolved Hide resolved

// True if we can do trivial move in Universal multi level
// compaction
bool is_trivial_move_;
Expand Down
12 changes: 10 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/hash.h"
#include "util/history_trimming_iterator.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/stop_watch.h"
Expand Down Expand Up @@ -425,7 +426,8 @@ CompactionJob::CompactionJob(
const std::atomic<int>* manual_compaction_paused,
const std::atomic<bool>* manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback)
: compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
Expand Down Expand Up @@ -464,6 +466,7 @@ CompactionJob::CompactionJob(
measure_io_stats_(measure_io_stats),
thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)),
trim_ts_(std::move(trim_ts)),
blob_callback_(blob_callback) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
Expand Down Expand Up @@ -1275,6 +1278,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<InternalIterator> raw_input(
versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));
if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) {
Slice trim_ts = trim_ts_;
auto iter = raw_input.release();
raw_input.reset(
new HistoryTrimmingIterator(iter, cfd->user_comparator(), trim_ts));
}
InternalIterator* input = raw_input.get();

IterKey start_ikey;
Expand Down Expand Up @@ -1309,7 +1318,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}

input->SeekToFirst();

AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);

Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class CompactionJob {
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::atomic<bool>* manual_compaction_canceled = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "",
std::string full_history_ts_low = "", std::string trim_ts = "",
BlobFileCompletionCallback* blob_callback = nullptr);

virtual ~CompactionJob();
Expand Down Expand Up @@ -216,6 +216,7 @@ class CompactionJob {
std::vector<uint64_t> sizes_;
Env::Priority thread_pri_;
std::string full_history_ts_low_;
std::string trim_ts_;
BlobFileCompletionCallback* blob_callback_;

uint64_t GetCompactionId(SubcompactionState* sub_compact);
Expand Down
10 changes: 7 additions & 3 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,10 @@ Compaction* CompactionPicker::CompactRange(
output_level, 1),
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
/* grandparents */ {},
/* is manual */ true);
/* grandparents */ {}, /* is manual */ true,
compact_range_options.trim_ts
? compact_range_options.trim_ts->ToString()
: "");
RegisterCompaction(c);
vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options);
return c;
Expand Down Expand Up @@ -821,7 +823,9 @@ Compaction* CompactionPicker::CompactRange(
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
Temperature::kUnknown, compact_range_options.max_subcompactions,
std::move(grandparents),
/* is manual compaction */ true);
/* is manual compaction */ true,
compact_range_options.trim_ts ? compact_range_options.trim_ts->ToString()
: "");

TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
RegisterCompaction(compaction);
Expand Down
10 changes: 5 additions & 5 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}
Expand Down Expand Up @@ -157,8 +157,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
0 /* max_subcompactions */, {}, /* is manual */ false,
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
return c;
Expand Down Expand Up @@ -208,7 +208,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* trim_ts */ "", vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}
Expand Down Expand Up @@ -313,7 +313,7 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
mutable_cf_options.compression, mutable_cf_options.compression_opts,
Temperature::kWarm,
/* max_subcompactions */ 0, {}, /* is manual */ false,
/* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
/* is deletion compaction */ false, CompactionReason::kChangeTemperature);
return c;
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_level.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_),
Temperature::kUnknown,
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);
/* trim_ts */ "", start_level_score_, false /* deletion_compaction */,
compaction_reason_);

// If it's level 0 compaction, make sure we don't execute any other level 0
// compactions in parallel
Expand Down
14 changes: 8 additions & 6 deletions db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,9 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
false /* deletion_compaction */, compaction_reason);
/* max_subcompactions */ 0, grandparents, /* is manual */ false,
/* trim_ts */ "", score_, false /* deletion_compaction */,
compaction_reason);
}

// Look at overall size amplification. If size amplification
Expand Down Expand Up @@ -1082,7 +1083,7 @@ Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
/* trim_ts */ "", score_, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}

Expand Down Expand Up @@ -1224,8 +1225,8 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
Temperature::kUnknown,
/* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
false /* deletion_compaction */,
/* max_subcompactions */ 0, grandparents, /* is manual */ false,
/* trim_ts */ "", score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
}

Expand Down Expand Up @@ -1300,7 +1301,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
/* trim_ts */ "", score_, false /* deletion_compaction */,
compaction_reason);
}

Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
Expand Down
9 changes: 7 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,10 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
disallow_trivial_move = true;
}
}
// trim_ts need real compaction to remove latest record
if (options.trim_ts != nullptr) {
disallow_trivial_move = true;
}
s = RunManualCompaction(cfd, level, output_level, options, begin, end,
exclusive, disallow_trivial_move,
max_file_num_to_ignore);
Expand Down Expand Up @@ -1358,7 +1362,8 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, nullptr, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow(), &blob_callback_);
c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(),
&blob_callback_);

// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
Expand Down Expand Up @@ -3321,7 +3326,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
is_manual ? &manual_compaction_paused_ : nullptr,
is_manual ? manual_compaction->canceled : nullptr, db_id_,
db_session_id_, c->column_family_data()->GetFullHistoryTsLow(),
&blob_callback_);
c->trim_ts(), &blob_callback_);
compaction_job.Prepare();

NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
Expand Down
51 changes: 51 additions & 0 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,58 @@ TEST_F(DBBasicTestWithTimestamp, MixedCfs) {
ASSERT_OK(s);

verify_db(handles_[1]);
Close();
}

TEST_F(DBBasicTestWithTimestamp, TrimHistoryTest) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
DestroyAndReopen(options);

std::string ts_str = Timestamp(2, 0);
WriteOptions wopts;
Slice ts = ts_str;
wopts.timestamp = &ts;
ASSERT_OK(db_->Put(wopts, "k1", "v1"));
ts_str = Timestamp(4, 0);
ts = ts_str;
wopts.timestamp = &ts;
ASSERT_OK(db_->Put(wopts, "k1", "v2"));
ts_str = Timestamp(5, 0);
ts = ts_str;
wopts.timestamp = &ts;
ASSERT_OK(db_->Delete(wopts, "k1"));
ts_str = Timestamp(6, 0);
ts = ts_str;
wopts.timestamp = &ts;
ASSERT_OK(db_->Put(wopts, "k1", "v3"));

ts_str = Timestamp(3, 0);
ts = ts_str;
CompactRangeOptions cro;
cro.trim_ts = &ts;
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));

ReadOptions ropts;
ts_str = Timestamp(2, 0);
ts = ts_str;
ropts.timestamp = &ts;
std::string value;
Status s = db_->Get(ropts, "k1", &value);
ASSERT_OK(s);
ASSERT_EQ("v1", value);

ts_str = Timestamp(7, 0);
ts = ts_str;
ropts.timestamp = &ts;
s = db_->Get(ropts, "k1", &value);
ASSERT_OK(s); // here asserts, read with ts=7 won't see (k1, v1)
ASSERT_EQ("v1", value);
Close();
}

Expand Down
6 changes: 6 additions & 0 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
return Slice(user_key.data() + user_key.size() - ts_sz, ts_sz);
}

inline Slice ExtractTimestampFromKey(const Slice& internal_key, size_t ts_sz) {
assert(internal_key.size() >= kNumInternalBytes + ts_sz);
const size_t n = internal_key.size();
sunlike-Lipo marked this conversation as resolved.
Show resolved Hide resolved
return Slice(internal_key.data() + n - ts_sz - kNumInternalBytes, ts_sz);
}

inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
assert(internal_key.size() >= kNumInternalBytes);
const size_t n = internal_key.size();
Expand Down
4 changes: 3 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1778,12 +1778,14 @@ struct CompactRangeOptions {
// Set user-defined timestamp low bound, the data with older timestamp than
// low bound maybe GCed by compaction. Default: nullptr
Slice* full_history_ts_low = nullptr;

// Allows cancellation of an in-progress manual compaction.
//
// Cancellation can be delayed waiting on automatic compactions when used
// together with `exclusive_manual_compaction == true`.
std::atomic<bool>* canceled = nullptr;
// Set user-defined timestamp trim bound, the data with newer timestamp than
// trim bound is removed by compaction
Slice* trim_ts = nullptr;
};

// IngestExternalFileOptions is used by IngestExternalFile()
Expand Down
Loading