Skip to content

Commit

Permalink
add support for range delete for the udt in memtable only feature
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jan 19, 2024
1 parent d69628e commit 092ff65
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 60 deletions.
13 changes: 8 additions & 5 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ Status BuildTable(
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);

const size_t ts_sz = ucmp->timestamp_size();
const bool strip_timestamp =
const bool logical_strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;

std::string key_after_flush_buf;
Expand All @@ -223,7 +223,7 @@ Status BuildTable(
// the in memory version of the key act logically the same as one with a
// minimum timestamp. We update the timestamp here so file boundary and
// output validator, block builder all see the effect of the stripping.
if (strip_timestamp) {
if (logical_strip_timestamp) {
key_after_flush_buf.clear();
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
key_after_flush = key_after_flush_buf;
Expand Down Expand Up @@ -266,9 +266,12 @@ Status BuildTable(
Slice last_tombstone_start_user_key{};
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
// TODO(yuzhangyu): handle range deletion for UDT in memtables only.
// When user timestamp should not be persisted, we logically strip a
// range tombstone's start and end key's timestamp (replace it with min
// timestamp) before passing them along to table builder and to update
// file boundaries.
auto tombstone = range_del_it->Tombstone(logical_strip_timestamp);
std::pair<InternalKey, Slice> kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey();
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
Expand Down
4 changes: 1 addition & 3 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <vector>

#include "db/column_family.h"
#include "db/dbformat.h"
#include "logging/logging.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/sst_partitioner.h"
Expand All @@ -21,9 +22,6 @@

namespace ROCKSDB_NAMESPACE {

const uint64_t kRangeTombstoneSentinel =
PackSequenceAndType(kMaxSequenceNumber, kTypeRangeDeletion);

int sstableKeyCompare(const Comparator* uc, const Slice& a, const Slice& b) {
auto c = uc->CompareWithoutTimestamp(ExtractUserKey(a), ExtractUserKey(b));
if (c != 0) {
Expand Down
147 changes: 117 additions & 30 deletions db/db_with_timestamp_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3522,9 +3522,12 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
// of flag `persist_user_defined_timestamps`.
std::string write_ts;
std::string min_ts;
std::string max_ts;
PutFixed64(&write_ts, 1);
PutFixed64(&min_ts, 0);
PutFixed64(&max_ts, std::numeric_limits<uint64_t>::max());
std::string smallest_ukey_without_ts = "bar";
std::string middle_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
bool persist_udt = test::ShouldPersistUDT(GetParam());
Expand All @@ -3536,10 +3539,12 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {

ASSERT_OK(
db_->Put(WriteOptions(), smallest_ukey_without_ts, write_ts, "val1"));
ASSERT_OK(
db_->Put(WriteOptions(), largest_ukey_without_ts, write_ts, "val2"));
ASSERT_OK(db_->Put(WriteOptions(), middle_ukey_without_ts, write_ts, "val2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
smallest_ukey_without_ts, largest_ukey_without_ts,
write_ts));

// Create a L0 SST file and its record is added to the Manfiest.
// Create a L0 SST file and its record is added to the Manifest.
ASSERT_OK(Flush());
Close();

Expand All @@ -3557,7 +3562,6 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
if (persist_udt) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts,
file_meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, file_meta.largest.user_key());
} else {
// If `persist_user_defined_timestamps` is false, the file boundaries should
// have the min timestamp. Behind the scenes, when file boundaries in
Expand All @@ -3566,8 +3570,13 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
// during DB open, a min timestamp is padded to the file boundaries. This
// test's writes contain non min timestamp to verify this logic end-to-end.
ASSERT_EQ(smallest_ukey_without_ts + min_ts, file_meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + min_ts, file_meta.largest.user_key());
}
// The right file boundary comes from range deletion, it uses max timestamp
// and a range deletion sentinel that uses the max sequence number to mark the
// end key exclusive. This is regardless of whether timestamp is persisted.
ASSERT_EQ(largest_ukey_without_ts + max_ts, file_meta.largest.user_key());
auto largest_footer = ExtractInternalKeyFooter(file_meta.largest.Encode());
ASSERT_EQ(largest_footer, kRangeTombstoneSentinel);
Close();
}

Expand Down Expand Up @@ -4006,42 +4015,80 @@ TEST_F(DBBasicTestWithTimestamp,
Close();
}

TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
class DeleteRangeWithTimestampTableOptions
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<
std::tuple<BlockBasedTableOptions::IndexType,
test::UserDefinedTimestampTestMode>> {
public:
explicit DeleteRangeWithTimestampTableOptions()
: DBBasicTestWithTimestampBase(
"delete_range_with_timestamp_table_options") {}
};

INSTANTIATE_TEST_CASE_P(
Timestamp, DeleteRangeWithTimestampTableOptions,
testing::Combine(
testing::Values(
BlockBasedTableOptions::IndexType::kBinarySearch,
BlockBasedTableOptions::IndexType::kHashSearch,
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey),
testing::Values(
test::UserDefinedTimestampTestMode::kNormal,
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp)));

TEST_P(DeleteRangeWithTimestampTableOptions, BasicReadAndIterate) {
const int kNum = 200, kRangeBegin = 50, kRangeEnd = 150, kNumPerFile = 25;
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
options.compression = kNoCompression;
BlockBasedTableOptions bbto;
bbto.index_type = GetParam();
bbto.index_type = std::get<0>(GetParam());
bbto.block_size = 100;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
bool persist_udt = test::ShouldPersistUDT(std::get<1>(GetParam()));
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = persist_udt;
// UDT in memtables only not compatible with concurrent memtable writes.
options.allow_concurrent_memtable_write = persist_udt ? true : false;
options.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
DestroyAndReopen(options);

// Write half of the keys before the tombstone and half after the tombstone.
// Only covered keys (i.e., within the range and older than the tombstone)
// should be deleted.
std::string full_history_ts_low;
int cutoff_ts = 0;
for (int i = 0; i < kNum; ++i) {
std::string write_ts;
PutFixed64(&write_ts, i);
if (i == kNum / 2) {
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key1(kRangeBegin), Key1(kRangeEnd),
Timestamp(i, 0)));
Key1(kRangeBegin), Key1(kRangeEnd), write_ts));
}
ASSERT_OK(db_->Put(WriteOptions(), Key1(i), Timestamp(i, 0),
"val" + std::to_string(i)));
ASSERT_OK(
db_->Put(WriteOptions(), Key1(i), write_ts, "val" + std::to_string(i)));
if (i == kNum - kNumPerFile) {
if (!persist_udt) {
// When UDTs are not persisted, mark the timestamps in the Memtables as
// all expired so the followed flush can go through.
cutoff_ts = i + 1;
PutFixed64(&full_history_ts_low, cutoff_ts);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
full_history_ts_low));
}
ASSERT_OK(Flush());
}
}

ReadOptions read_opts;
read_opts.total_order_seek = true;
std::string read_ts = Timestamp(kNum, 0);
std::string read_ts;
PutFixed64(&read_ts, kNum);
Slice read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
{
Expand Down Expand Up @@ -4076,33 +4123,43 @@ TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
ASSERT_OK(iter->status());
ASSERT_EQ(-1, expected);

read_ts = Timestamp(0, 0);
read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
iter.reset(db_->NewIterator(read_opts));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key1(0));
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
// Cannot read below the cutoff timestamp when timestamps are not persisted.
if (persist_udt) {
read_ts.clear();
PutFixed64(&read_ts, 0);
read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
iter.reset(db_->NewIterator(read_opts));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), Key1(0));
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
}

read_ts = Timestamp(kNum, 0);
read_ts.clear();
PutFixed64(&read_ts, kNum);
read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
std::string value, timestamp;
Status s;
std::string expected_ts;
int int_expected_ts;
for (int i = 0; i < kNum; ++i) {
s = db_->Get(read_opts, Key1(i), &value, &timestamp);
if (i >= kRangeBegin && i < kNum / 2) {
ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ(timestamp, Timestamp(kNum / 2, 0));
int_expected_ts = (persist_udt || kNum / 2 >= cutoff_ts) ? kNum / 2 : 0;
} else {
ASSERT_OK(s);
ASSERT_EQ(value, "val" + std::to_string(i));
ASSERT_EQ(timestamp, Timestamp(i, 0));
int_expected_ts = (persist_udt || i >= cutoff_ts) ? i : 0;
}
expected_ts.clear();
PutFixed64(&expected_ts, int_expected_ts);
ASSERT_EQ(timestamp, expected_ts);
}

size_t batch_size = kNum;
Expand All @@ -4121,11 +4178,41 @@ TEST_P(DBBasicTestWithTimestampTableOptions, DeleteRangeBaiscReadAndIterate) {
for (int i = 0; i < kNum; ++i) {
if (i >= kRangeBegin && i < kNum / 2) {
ASSERT_TRUE(statuses[i].IsNotFound());
ASSERT_EQ(timestamps[i], Timestamp(kNum / 2, 0));
int_expected_ts = (persist_udt || kNum / 2 >= cutoff_ts) ? kNum / 2 : 0;
} else {
ASSERT_OK(statuses[i]);
ASSERT_EQ(values[i], "val" + std::to_string(i));
ASSERT_EQ(timestamps[i], Timestamp(i, 0));
int_expected_ts = (persist_udt || i >= cutoff_ts) ? i : 0;
}
expected_ts.clear();
PutFixed64(&expected_ts, int_expected_ts);
ASSERT_EQ(timestamps[i], expected_ts);
}

CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
if (!persist_udt) {
// Mark everything expired so manual compaction can go through
full_history_ts_low.clear();
PutFixed64(&full_history_ts_low, kNum);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(db_->DefaultColumnFamily(),
full_history_ts_low));
}
Slice compaction_ts = full_history_ts_low;
cro.full_history_ts_low = &compaction_ts;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
for (int i = kRangeBegin; i < kNum / 2; ++i) {
s = db_->Get(read_opts, Key1(i), &value, &timestamp);
ASSERT_TRUE(s.IsNotFound());
if (persist_udt) {
expected_ts.clear();
PutFixed64(&expected_ts, kNum / 2);
ASSERT_EQ(timestamp, expected_ts);
} else {
// When timestamps are not persisted, data in SST files all logically have
// min timestamp. A compaction to the last level will drop the range
// tombstone.
ASSERT_TRUE(timestamp.empty());
}
}
Close();
Expand Down
19 changes: 19 additions & 0 deletions db/dbformat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ void AppendInternalKeyWithDifferentTimestamp(std::string* result,
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}

void AppendUserKeyWithDifferentTimestamp(std::string* result, const Slice& key,
const Slice& ts) {
assert(key.size() >= ts.size());
result->append(key.data(), key.size() - ts.size());
result->append(ts.data(), ts.size());
}

void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
ValueType t) {
PutFixed64(result, PackSequenceAndType(s, t));
Expand Down Expand Up @@ -110,13 +117,25 @@ void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
void PadInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
assert(ts_sz > 0);
assert(key.size() >= kNumInternalBytes);
size_t user_key_size = key.size() - kNumInternalBytes;
result->reserve(key.size() + ts_sz);
result->append(key.data(), user_key_size);
result->append(ts_sz, static_cast<unsigned char>(0));
result->append(key.data() + user_key_size, kNumInternalBytes);
}

void PadInternalKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
assert(ts_sz > 0);
assert(key.size() >= kNumInternalBytes);
size_t user_key_size = key.size() - kNumInternalBytes;
result->reserve(key.size() + ts_sz);
result->append(key.data(), user_key_size);
result->append(std::string(ts_sz, '\xff'));
result->append(key.data() + user_key_size, kNumInternalBytes);
}

void StripTimestampFromInternalKey(std::string* result, const Slice& key,
size_t ts_sz) {
assert(key.size() >= ts_sz + kNumInternalBytes);
Expand Down
Loading

0 comments on commit 092ff65

Please sign in to comment.