Skip to content

Commit

Permalink
Enable backward iterator for keys with user-defined timestamp (#8035)
Browse files Browse the repository at this point in the history
Summary:
This PR does the following:

- Enable backward iteration for keys with user-defined timestamp. Note that merge, single delete, range delete are not supported yet.
- Introduces a new helper API `Comparator::EqualWithoutTimestamp()`.
- Fix a typo in `SetTimestamp()`.
- Add/update unit tests

Run db_bench (built with DEBUG_LEVEL=0) to demonstrate that no overhead is introduced for CPU-intensive workloads with a lot of `Prev()`. Also provided results of iterating keys with timestamps.

1. Disable timestamp, run:
```
./db_bench -db=/dev/shm/rocksdb -disable_wal=1 -benchmarks=fillseq,seekrandom[-W1-X6] -reverse_iterator=1 -seek_nexts=5
```
Results:
> Baseline
> - seekrandom [AVG    6 runs] : 96115 ops/sec;   53.2 MB/sec
> - seekrandom [MEDIAN 6 runs] : 98075 ops/sec;   54.2 MB/sec
>
> This PR
> - seekrandom [AVG    6 runs] : 95521 ops/sec;   52.8 MB/sec
> - seekrandom [MEDIAN 6 runs] : 96338 ops/sec;   53.3 MB/sec

2. Enable timestamp, run:
```
./db_bench -user_timestamp_size=8  -db=/dev/shm/rocksdb -disable_wal=1 -benchmarks=fillseq,seekrandom[-W1-X6] -reverse_iterator=1 -seek_nexts=5
```
Result:
> Baseline: not supported
>
> This PR
> - seekrandom [AVG    6 runs] : 90514 ops/sec;   50.1 MB/sec
> - seekrandom [MEDIAN 6 runs] : 90834 ops/sec;   50.2 MB/sec

Pull Request resolved: #8035

Reviewed By: ltamasi

Differential Revision: D26926668

Pulled By: riversand963

fbshipit-source-id: 95330cc2242397c03e09d29e5417dfb0adc98ef5
  • Loading branch information
riversand963 authored and facebook-github-bot committed Mar 10, 2021
1 parent 64517d1 commit 82b3888
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 79 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
* Add new SetBufferSize API to WriteBufferManager to allow dynamic management of memory allotted to all write buffers. This allows user code to adjust memory monitoring provided by WriteBufferManager as process memory needs change datasets grow and shrink.
* Clarified the required semantics of Read() functions in FileSystem and Env APIs. Please ensure any custom implementations are compliant.
* For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files.
* Add EqualWithoutTimestamp() to Comparator.

### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.
* Add support to extend retrieval of checksums for blob files from the MANIFEST when checkpointing. During backup, rocksdb can detect corruption in blob files during file copies.
* Enable backward iteration on keys with user-defined timestamps.

## 6.18.0 (02/19/2021)
### Behavior Changes
Expand Down
18 changes: 7 additions & 11 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,11 @@ void CompactionIterator::NextFromInput() {
// merge_helper_->compaction_filter_skip_until_.
Slice skip_until;

int cmp_user_key_without_ts = 0;
bool user_key_equal_without_ts = false;
int cmp_ts = 0;
if (has_current_user_key_) {
cmp_user_key_without_ts =
timestamp_size_
? cmp_->CompareWithoutTimestamp(ikey_.user_key, current_user_key_)
: cmp_->Compare(ikey_.user_key, current_user_key_);
user_key_equal_without_ts =
cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_);
// if timestamp_size_ > 0, then curr_ts_ has been initialized by a
// previous key.
cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
Expand All @@ -409,7 +407,7 @@ void CompactionIterator::NextFromInput() {
// Check whether the user key changed. After this if statement current_key_
// is a copy of the current input key (maybe converted to a delete by the
// compaction filter). ikey_.user_key is pointing to the copy.
if (!has_current_user_key_ || cmp_user_key_without_ts != 0 || cmp_ts != 0) {
if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) {
// First occurrence of this user key
// Copy key for output
key_ = current_key_.SetInternalKey(key_, &ikey_);
Expand All @@ -430,7 +428,7 @@ void CompactionIterator::NextFromInput() {
// consider this key for GC, e.g. it may be dropped if certain conditions
// match.
if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
0 != cmp_user_key_without_ts || cmp_with_history_ts_low_ >= 0) {
!user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0) {
// Initialize for future comparison for rule (A) and etc.
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
Expand Down Expand Up @@ -724,8 +722,7 @@ void CompactionIterator::NextFromInput() {
input_->Valid() &&
(ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
.ok()) &&
0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
next_ikey.user_key) &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) &&
(prev_snapshot == 0 ||
DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
input_->Next();
Expand All @@ -735,8 +732,7 @@ void CompactionIterator::NextFromInput() {
if (input_->Valid() &&
(ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
.ok()) &&
0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
next_ikey.user_key)) {
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
valid_ = true;
at_next_ = true;
}
Expand Down
120 changes: 75 additions & 45 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
}

bool DBIter::ParseKey(ParsedInternalKey* ikey) {
Status s =
ParseInternalKey(iter_.key(), ikey, false /* log_err_key */); // TODO
Status s = ParseInternalKey(iter_.key(), ikey, false /* log_err_key */);
if (!s.ok()) {
status_ = Status::Corruption("In DBIter: ", s.getState());
valid_ = false;
Expand Down Expand Up @@ -489,7 +488,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
} else {
const std::string kTsMin(timestamp_size_, static_cast<char>(0));
const std::string kTsMin(timestamp_size_, '\0');
AppendInternalKeyWithDifferentTimestamp(
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
Expand Down Expand Up @@ -635,13 +634,6 @@ bool DBIter::MergeValuesNewToOld() {
}

void DBIter::Prev() {
if (timestamp_size_ > 0) {
valid_ = false;
status_ = Status::NotSupported(
"SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
return;
}

assert(valid_);
assert(status_.ok());

Expand Down Expand Up @@ -680,8 +672,14 @@ bool DBIter::ReverseToForward() {
// If that's the case, seek iter_ to current key.
if (!expect_total_order_inner_iter() || !iter_.Valid()) {
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
kValueTypeForSeek);
if (timestamp_size_ > 0) {
// TODO: pre-create kTsMax.
const std::string kTsMax(timestamp_size_, '\xff');
pikey.SetTimestamp(kTsMax);
}
last_key.SetInternalKey(pikey);
iter_.Seek(last_key.GetInternalKey());
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
}
Expand Down Expand Up @@ -760,11 +758,13 @@ void DBIter::PrevInternal(const Slice* prefix) {
}

assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
user_comparator_.Compare(saved_key_.GetUserKey(),
*iterate_lower_bound_) >= 0);
user_comparator_.CompareWithoutTimestamp(
saved_key_.GetUserKey(), /*a_has_ts=*/true,
*iterate_lower_bound_, /*b_has_ts=*/false) >= 0);
if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
user_comparator_.Compare(saved_key_.GetUserKey(),
*iterate_lower_bound_) < 0) {
user_comparator_.CompareWithoutTimestamp(
saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
/*b_has_ts=*/false) < 0) {
// We've iterated earlier than the user-specified lower bound.
valid_ = false;
return;
Expand Down Expand Up @@ -809,8 +809,8 @@ bool DBIter::FindValueForCurrentKey() {
assert(iter_.Valid());
merge_context_.Clear();
current_entry_is_merged_ = false;
// last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
// kTypeValue)
// last entry before merge (could be kTypeDeletion,
// kTypeDeletionWithTimestamp, kTypeSingleDeletion or kTypeValue)
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;

Expand All @@ -831,9 +831,13 @@ bool DBIter::FindValueForCurrentKey() {
timestamp_size_);
}
if (!IsVisible(ikey.sequence, ts) ||
!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey())) {
break;
}
if (!ts.empty()) {
saved_timestamp_.assign(ts.data(), ts.size());
}
if (TooManyInternalKeysSkipped()) {
return false;
}
Expand Down Expand Up @@ -873,6 +877,7 @@ bool DBIter::FindValueForCurrentKey() {
}
break;
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
merge_context_.Clear();
last_not_merge_type = last_key_entry_type;
Expand Down Expand Up @@ -916,6 +921,7 @@ bool DBIter::FindValueForCurrentKey() {
is_blob_ = false;
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
valid_ = false;
Expand Down Expand Up @@ -976,8 +982,17 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// FindValueForCurrentKeyUsingSeek()
assert(pinned_iters_mgr_.PinningEnabled());
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
sequence_, kValueTypeForSeek));
if (0 == timestamp_size_) {
AppendInternalKey(&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek));
} else {
AppendInternalKeyWithDifferentTimestamp(
&last_key,
ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
kValueTypeForSeek),
*timestamp_ub_);
}
iter_.Seek(last_key);
RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);

Expand All @@ -1001,7 +1016,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
timestamp_size_);
}

if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
if (!user_comparator_.EqualWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey())) {
// No visible values for this key, even though FindValueForCurrentKey()
// has seen some. This is possible if we're using a tailing iterator, and
// the entries were discarded in a compaction.
Expand All @@ -1018,14 +1034,19 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {

if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal)) {
ikey, RangeDelPositioningMode::kBackwardTraversal) ||
kTypeDeletionWithTimestamp == ikey.type) {
valid_ = false;
return true;
}
if (!iter_.PrepareValue()) {
valid_ = false;
return false;
}
if (timestamp_size_ > 0) {
Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
saved_timestamp_.assign(ts.data(), ts.size());
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
Expand Down Expand Up @@ -1142,7 +1163,8 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
return false;
}

if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
if (user_comparator_.CompareWithoutTimestamp(ikey.user_key,
saved_key_.GetUserKey()) < 0) {
return true;
}

Expand All @@ -1166,8 +1188,14 @@ bool DBIter::FindUserKeyBeforeSavedKey() {
if (num_skipped >= max_skip_) {
num_skipped = 0;
IterKey last_key;
last_key.SetInternalKey(ParsedInternalKey(
saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
ParsedInternalKey pikey(saved_key_.GetUserKey(), kMaxSequenceNumber,
kValueTypeForSeek);
if (timestamp_size_ > 0) {
// TODO: pre-create kTsMax.
const std::string kTsMax(timestamp_size_, '\xff');
pikey.SetTimestamp(kTsMax);
}
last_key.SetInternalKey(pikey);
// It would be more efficient to use SeekForPrev() here, but some
// iterators may not support it.
iter_.Seek(last_key.GetInternalKey());
Expand Down Expand Up @@ -1244,13 +1272,27 @@ void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
saved_key_.Clear();
// now saved_key is used to store internal key.
saved_key_.SetInternalKey(target, 0 /* sequence_number */,
kValueTypeForSeekForPrev);
kValueTypeForSeekForPrev, timestamp_ub_);

if (timestamp_size_ > 0) {
const std::string kTsMin(timestamp_size_, '\0');
Slice ts = kTsMin;
saved_key_.UpdateInternalKey(/*seq=*/0, kValueTypeForSeekForPrev, &ts);
}

if (iterate_upper_bound_ != nullptr &&
user_comparator_.Compare(saved_key_.GetUserKey(),
*iterate_upper_bound_) >= 0) {
user_comparator_.CompareWithoutTimestamp(
saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_upper_bound_,
/*b_has_ts=*/false) >= 0) {
saved_key_.Clear();
saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber,
kValueTypeForSeekForPrev, timestamp_ub_);
if (timestamp_size_ > 0) {
const std::string kTsMax(timestamp_size_, '\xff');
Slice ts = kTsMax;
saved_key_.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeekForPrev,
&ts);
}
}
}

Expand Down Expand Up @@ -1353,13 +1395,6 @@ void DBIter::SeekForPrev(const Slice& target) {
}
#endif // ROCKSDB_LITE

if (timestamp_size_ > 0) {
valid_ = false;
status_ = Status::NotSupported(
"SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
return;
}

status_ = Status::OK();
ReleaseTempPinnedData();
ResetInternalKeysSkippedCounter();
Expand Down Expand Up @@ -1454,17 +1489,12 @@ void DBIter::SeekToFirst() {
}

void DBIter::SeekToLast() {
if (timestamp_size_ > 0) {
valid_ = false;
status_ = Status::NotSupported(
"SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
return;
}

if (iterate_upper_bound_ != nullptr) {
// Seek to last key strictly less than ReadOptions.iterate_upper_bound.
SeekForPrev(*iterate_upper_bound_);
if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
if (Valid() && 0 == user_comparator_.CompareWithoutTimestamp(
*iterate_upper_bound_, /*a_has_ts=*/false, key(),
/*b_has_ts=*/false)) {
ReleaseTempPinnedData();
PrevInternal(nullptr);
}
Expand Down
6 changes: 5 additions & 1 deletion db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class DBIter final : public Iterator {
// this->key().
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum Direction { kForward, kReverse };
enum Direction : uint8_t { kForward, kReverse };

// LocalStatistics contain Statistics counters that will be aggregated per
// each iterator instance and then will be sent to the global statistics when
Expand Down Expand Up @@ -184,6 +184,9 @@ class DBIter final : public Iterator {
Slice timestamp() const override {
assert(valid_);
assert(timestamp_size_ > 0);
if (direction_ == kReverse) {
return saved_timestamp_;
}
const Slice ukey_and_ts = saved_key_.GetUserKey();
assert(timestamp_size_ < ukey_and_ts.size());
return ExtractTimestampFromUserKey(ukey_and_ts, timestamp_size_);
Expand Down Expand Up @@ -372,6 +375,7 @@ class DBIter final : public Iterator {
const Slice* const timestamp_ub_;
const Slice* const timestamp_lb_;
const size_t timestamp_size_;
std::string saved_timestamp_;
};

// Return a new iterator that converts internal keys (yielded by
Expand Down
Loading

0 comments on commit 82b3888

Please sign in to comment.