Skip to content

Commit

Permalink
Revise APIs related to user-defined timestamp (facebook#8946)
Browse files Browse the repository at this point in the history
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".

According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).

For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.

Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.

The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.

Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.

Pull Request resolved: facebook#8946

Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0

Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```

Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom   :       1.831 micros/op 546235 ops/sec;   60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom   :       1.820 micros/op 549404 ops/sec;   60.8 MB/s
```

Reviewed By: ltamasi

Differential Revision: D33721359

Pulled By: riversand963

fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
Signed-off-by: v01dstar <yang.zhang@pingcap.com>
  • Loading branch information
riversand963 authored and v01dstar committed Jun 14, 2022
1 parent 9bfed19 commit 17d810c
Show file tree
Hide file tree
Showing 31 changed files with 1,291 additions and 751 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Public API changes
* Remove ReadOptions::iter_start_seqnum which has been deprecated.
* Remove DBOptions::preserved_deletes and DB::SetPreserveDeletesSequenceNumber().
* Removed timestamp from WriteOptions. Accordingly, added to DB APIs Put, Delete, SingleDelete, etc. accepting an additional argument 'timestamp'. Added Put, Delete, SingleDelete, etc to WriteBatch accepting an additional argument 'timestamp'. Removed WriteBatch::AssignTimestamps(vector<Slice>) API. Renamed WriteBatch::AssignTimestamp() to WriteBatch::UpdateTimestamps() with clarified comments.

## 6.29.5 (03/29/2022)
### Bug Fixes
Expand Down
11 changes: 11 additions & 0 deletions db/db_impl/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {

Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
const Slice& key, PinnableSlice* value) {
assert(user_comparator_);
if (options.timestamp || user_comparator_->timestamp_size()) {
// TODO: support timestamp
return Status::NotSupported();
}
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, true, nullptr, nullptr);
Expand All @@ -58,6 +63,11 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
assert(user_comparator_);
if (user_comparator_->timestamp_size() || options.timestamp) {
// TODO: support timestamp
return std::vector<Status>(keys.size(), Status::NotSupported());
}
autovector<TableReader*, 16> reader_list;
for (const auto& key : keys) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
Expand All @@ -69,6 +79,7 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
reader_list.push_back(f.fd.table_reader);
}
}

std::vector<Status> statuses(keys.size(), Status::NotFound());
values->resize(keys.size());
int idx = 0;
Expand Down
129 changes: 97 additions & 32 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1742,19 +1742,21 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
get_impl_options.merge_operands != nullptr);

assert(get_impl_options.column_family);
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
GetWithTimestampReadCallback read_cb(0); // Will call Refresh

#ifndef NDEBUG
if (ts_sz > 0) {
assert(read_options.timestamp);
assert(read_options.timestamp->size() == ts_sz);
if (read_options.timestamp) {
const Status s = FailIfTsSizesMismatch(get_impl_options.column_family,
*(read_options.timestamp));
if (!s.ok()) {
return s;
}
} else {
assert(!read_options.timestamp);
const Status s = FailIfCfHasTs(get_impl_options.column_family);
if (!s.ok()) {
return s;
}
}
#endif // NDEBUG

GetWithTimestampReadCallback read_cb(0); // Will call Refresh

PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
Expand Down Expand Up @@ -1823,7 +1825,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
// only if t <= read_opts.timestamp and s <= snapshot.
// HACK: temporarily overwrite input struct field but restore
SaveAndRestore<ReadCallback*> restore_callback(&get_impl_options.callback);
if (ts_sz > 0) {
const Comparator* ucmp = get_impl_options.column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
assert(!get_impl_options
.callback); // timestamp with callback is not supported
read_cb.Refresh(snapshot);
Expand All @@ -1846,7 +1850,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
bool done = false;
std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
std::string* timestamp =
ucmp->timestamp_size() > 0 ? get_impl_options.timestamp : nullptr;
if (!skip_memtable) {
// Get value associated with key
if (get_impl_options.get_value) {
Expand Down Expand Up @@ -1954,19 +1959,36 @@ std::vector<Status> DBImpl::MultiGet(
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
PERF_TIMER_GUARD(get_snapshot_time);

#ifndef NDEBUG
for (const auto* cfh : column_family) {
assert(cfh);
const Comparator* const ucmp = cfh->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
assert(read_options.timestamp);
assert(ucmp->timestamp_size() == read_options.timestamp->size());
size_t num_keys = keys.size();
assert(column_family.size() == num_keys);
std::vector<Status> stat_list(num_keys);

bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
assert(column_family[i]);
if (read_options.timestamp) {
stat_list[i] =
FailIfTsSizesMismatch(column_family[i], *(read_options.timestamp));
if (!stat_list[i].ok()) {
should_fail = true;
}
} else {
assert(!read_options.timestamp);
stat_list[i] = FailIfCfHasTs(column_family[i]);
if (!stat_list[i].ok()) {
should_fail = true;
}
}
}
#endif // NDEBUG

if (should_fail) {
for (auto& s : stat_list) {
if (s.ok()) {
s = Status::Incomplete(
"DB not queried due to invalid argument(s) in the same MultiGet");
}
}
return stat_list;
}

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
Expand Down Expand Up @@ -2009,8 +2031,6 @@ std::vector<Status> DBImpl::MultiGet(
MergeContext merge_context;

// Note: this always resizes the values array
size_t num_keys = keys.size();
std::vector<Status> stat_list(num_keys);
values->resize(num_keys);
if (timestamps) {
timestamps->resize(num_keys);
Expand Down Expand Up @@ -2277,20 +2297,31 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
return;
}

#ifndef NDEBUG
bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
ColumnFamilyHandle* cfh = column_families[i];
assert(cfh);
const Comparator* const ucmp = cfh->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
assert(read_options.timestamp);
assert(read_options.timestamp->size() == ucmp->timestamp_size());
if (read_options.timestamp) {
statuses[i] = FailIfTsSizesMismatch(cfh, *(read_options.timestamp));
if (!statuses[i].ok()) {
should_fail = true;
}
} else {
assert(!read_options.timestamp);
statuses[i] = FailIfCfHasTs(cfh);
if (!statuses[i].ok()) {
should_fail = true;
}
}
}
#endif // NDEBUG
if (should_fail) {
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = Status::Incomplete(
"DB not queried due to invalid argument(s) in the same MultiGet");
}
}
return;
}

if (tracer_) {
// TODO: This mutex should be removed later, to improve performance when
Expand Down Expand Up @@ -2918,6 +2949,21 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
"ReadTier::kPersistedData is not yet supported in iterators."));
}

assert(column_family);

if (read_options.timestamp) {
const Status s =
FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
if (!s.ok()) {
return NewErrorIterator(s);
}
} else {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return NewErrorIterator(s);
}
}

auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
assert(cfd != nullptr);
Expand Down Expand Up @@ -3044,6 +3090,25 @@ Status DBImpl::NewIterators(
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}

if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
} else {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfCfHasTs(cf);
if (!s.ok()) {
return s;
}
}
}

ReadCallback* read_callback = nullptr; // No read callback provided.
iterators->clear();
iterators->reserve(column_families.size());
Expand Down
81 changes: 68 additions & 13 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,36 @@ class DBImpl : public DB {
// ---- Implementations of the DB interface ----

using DB::Resume;
virtual Status Resume() override;
Status Resume() override;

using DB::Put;
virtual Status Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) override;

using DB::Merge;
virtual Status Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
using DB::Delete;
virtual Status Delete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override;
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key) override;
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts) override;

using DB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override;
Status SingleDelete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override;
Status SingleDelete(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) override;

using DB::DeleteRange;
Status DeleteRange(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key) override;

using DB::Write;
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;
Expand Down Expand Up @@ -1389,6 +1401,10 @@ class DBImpl : public DB {
// to ensure that db_session_id_ gets updated every time the DB is opened
void SetDbSessionId();

Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const;
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
const Slice& ts) const;

private:
friend class DB;
friend class ErrorHandler;
Expand Down Expand Up @@ -2434,4 +2450,43 @@ static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}

inline Status DBImpl::FailIfCfHasTs(
const ColumnFamilyHandle* column_family) const {
column_family = column_family ? column_family : DefaultColumnFamily();
assert(column_family);
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() > 0) {
std::ostringstream oss;
oss << "cannot call this method on column family "
<< column_family->GetName() << " that enables timestamp";
return Status::InvalidArgument(oss.str());
}
return Status::OK();
}

inline Status DBImpl::FailIfTsSizesMismatch(
const ColumnFamilyHandle* column_family, const Slice& ts) const {
if (!column_family) {
return Status::InvalidArgument("column family handle cannot be null");
}
assert(column_family);
const Comparator* const ucmp = column_family->GetComparator();
assert(ucmp);
if (0 == ucmp->timestamp_size()) {
std::stringstream oss;
oss << "cannot call this method on column family "
<< column_family->GetName() << " that does not enable timestamp";
return Status::InvalidArgument(oss.str());
}
const size_t ts_sz = ts.size();
if (ts_sz != ucmp->timestamp_size()) {
std::stringstream oss;
oss << "Timestamp sizes mismatch: expect " << ucmp->timestamp_size() << ", "
<< ts_sz << " given";
return Status::InvalidArgument(oss.str());
}
return Status::OK();
}

} // namespace ROCKSDB_NAMESPACE
31 changes: 31 additions & 0 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
assert(pinnable_val != nullptr);
// TODO: stopwatch DB_GET needed?, perf timer needed?
PERF_TIMER_GUARD(get_snapshot_time);

assert(column_family);
const Comparator* ucmp = column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() || read_options.timestamp) {
// TODO: support timestamp
return Status::NotSupported();
}

Status s;
SequenceNumber snapshot = versions_->LastSequence();
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
Expand Down Expand Up @@ -74,6 +83,13 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,

Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
assert(column_family);
const Comparator* ucmp = column_family->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size() || read_options.timestamp) {
// TODO: support timestamp
return NewErrorIterator(Status::NotSupported());
}
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
Expand Down Expand Up @@ -101,6 +117,21 @@ Status DBImplReadOnly::NewIterators(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (read_options.timestamp) {
// TODO: support timestamp
return Status::NotSupported();
} else {
for (auto* cf : column_families) {
assert(cf);
const Comparator* ucmp = cf->GetComparator();
assert(ucmp);
if (ucmp->timestamp_size()) {
// TODO: support timestamp
return Status::NotSupported();
}
}
}

ReadCallback* read_callback = nullptr; // No read callback provided.
if (iterators == nullptr) {
return Status::InvalidArgument("iterators not allowed to be nullptr");
Expand Down
Loading

0 comments on commit 17d810c

Please sign in to comment.