Skip to content

Commit

Permalink
Snapshots with user-specified timestamps (facebook#9879)
Browse files Browse the repository at this point in the history
Summary:
In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written
to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable.

It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can
do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps
and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29.

This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in
https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps.
Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps.

In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot`
object is created with the last published sequence number of the super-version. You can see that the reader actually
has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called,
an arbitrarily long period of time may have already elapsed since the last write, which is when the last published
sequence number is written.

This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is
exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction
commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will
ensure any two snapshots with timestamps should satisfy the following:
```
snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts
```

If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on
in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create
a snapshot with associated timestamp.

Code example
```cpp
// Create a timestamped snapshot when committing transaction.
txn->SetCommitTimestamp(100);
txn->SetSnapshotOnNextOperation();
txn->Commit();

// A wrapper API for convenience
Status Transaction::CommitAndTryCreateSnapshot(
    std::shared_ptr<TransactionNotifier> notifier,
    TxnTimestamp ts,
    std::shared_ptr<const Snapshot>* ret);

// Create a timestamped snapshot if caller guarantees no concurrent writes
std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100);
```

The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with
other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp.
```cpp
// Return the timestamped snapshot correponding to given timestamp. If ts is
// kMaxTxnTimestamp, then we return the latest timestamped snapshot if present.
// Othersise, we return the snapshot whose timestamp is equal to `ts`. If no
// such snapshot exists, then we return null.
std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const;
// Return the latest timestamped snapshot if present.
std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const;
```

We also provide two additional APIs for stats collection and reporting purposes.

```cpp
Status TransactionDB::GetAllTimestampedSnapshots(
    std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
// Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`.
Status TransactionDB::GetTimestampedSnapshots(
    TxnTimestamp ts_lb,
    TxnTimestamp ts_ub,
    std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
```

To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release
timestamped snapshots whose timestamps are older than or equal to a given threshold.
```cpp
void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts);
```

Before shutdown, RocksDB will release all timestamped snapshots.

Comparison with user-defined timestamp and how they can be combined:
User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile
mapping between snapshots (sequence numbers) and timestamps.
Different internal keys with the same user key but different timestamps will be treated as different by compaction,
thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection.
In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in
this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent).
The timestamped snapshot supports the semantics of reading at an exact point in time.

Timestamped snapshots can also be used with user-defined timestamp.

Pull Request resolved: facebook#9879

Test Plan:
```
make check
TEST_TMPDIR=/dev/shm make crash_test_with_txn
```

Reviewed By: siying

Differential Revision: D35783919

Pulled By: riversand963

fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
  • Loading branch information
riversand963 authored and facebook-github-bot committed Jun 10, 2022
1 parent f4052d1 commit 1777e5f
Show file tree
Hide file tree
Showing 35 changed files with 1,204 additions and 90 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ manifest_dump
sst_dump
blob_dump
block_cache_trace_analyzer
db_readonly_with_timestamp_test
db_with_timestamp_basic_test
tools/block_cache_analyzer/*.pyc
column_aware_encoding_exp
util/build_version.cc
Expand All @@ -53,7 +51,6 @@ rocksdb_dump
rocksdb_undump
db_test2
trace_analyzer
trace_analyzer_test
block_cache_trace_analyzer
io_tracer_parser
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ if(WITH_TESTS)
utilities/transactions/write_prepared_transaction_test.cc
utilities/transactions/write_unprepared_transaction_test.cc
utilities/transactions/lock/range/range_locking_test.cc
utilities/transactions/timestamped_snapshot_test.cc
utilities/ttl/ttl_test.cc
utilities/util_merge_operators_test.cc
utilities/write_batch_with_index/write_batch_with_index_test.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Add blob garbage collection parameters `blob_garbage_collection_policy` and `blob_garbage_collection_age_cutoff` to both force-enable and force-disable GC, as well as selectively override age cutoff when using CompactRange.
* Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file.
* Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level.
* Add support for timestamped snapshots (#9879)

### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1777,6 +1777,9 @@ write_prepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_prepare
write_unprepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_unprepared_transaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

timestamped_snapshot_test: $(OBJ_DIR)/utilities/transactions/timestamped_snapshot_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

sst_dump: $(OBJ_DIR)/tools/sst_dump.o $(TOOLS_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
6 changes: 6 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -5768,6 +5768,12 @@ cpp_unittest_wrapper(name="timer_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="timestamped_snapshot_test",
srcs=["utilities/transactions/timestamped_snapshot_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="trace_analyzer_test",
srcs=["tools/trace_analyzer_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
219 changes: 188 additions & 31 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,19 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
WaitForBackgroundWork();
}

Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() {
size_t num_snapshots = 0;
ReleaseTimestampedSnapshotsOlderThan(std::numeric_limits<uint64_t>::max(),
&num_snapshots);

// If there is unreleased snapshot, fail the close call
if (num_snapshots > 0) {
return Status::Aborted("Cannot close DB with unreleased snapshot.");
}

return Status::OK();
}

Status DBImpl::CloseHelper() {
// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
Expand Down Expand Up @@ -732,11 +745,19 @@ Status DBImpl::CloseImpl() { return CloseHelper(); }

DBImpl::~DBImpl() {
InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
if (!closed_) {
closed_ = true;
closing_status_ = CloseHelper();
closing_status_.PermitUncheckedError();
if (closed_) {
return;
}

closed_ = true;

{
const Status s = MaybeReleaseTimestampedSnapshotsAndCheck();
s.PermitUncheckedError();
}

closing_status_ = CloseImpl();
closing_status_.PermitUncheckedError();
}

void DBImpl::MaybeIgnoreError(Status* s) const {
Expand Down Expand Up @@ -1797,11 +1818,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
// data for the snapshot, so the reader would see neither data that was be
// visible to the snapshot before compaction nor the newer data inserted
// afterwards.
if (last_seq_same_as_publish_seq_) {
snapshot = versions_->LastSequence();
} else {
snapshot = versions_->LastPublishedSequence();
}
snapshot = GetLastPublishedSequence();
if (get_impl_options.callback) {
// The unprep_seqs are not published for write unprepared, so it could be
// that max_visible_seq is larger. Seek to the std::max of the two.
Expand Down Expand Up @@ -2194,11 +2211,7 @@ bool DBImpl::MultiCFSnapshot(
// version because a flush happening in between may compact away data for
// the snapshot, but the snapshot is earlier than the data overwriting it,
// so users may see wrong results.
if (last_seq_same_as_publish_seq_) {
*snapshot = versions_->LastSequence();
} else {
*snapshot = versions_->LastPublishedSequence();
}
*snapshot = GetLastPublishedSequence();
}
} else {
// If we end up with the same issue of memtable geting sealed during 2
Expand Down Expand Up @@ -2229,11 +2242,7 @@ bool DBImpl::MultiCFSnapshot(
// acquire the lock so we're sure to succeed
mutex_.Lock();
}
if (last_seq_same_as_publish_seq_) {
*snapshot = versions_->LastSequence();
} else {
*snapshot = versions_->LastPublishedSequence();
}
*snapshot = GetLastPublishedSequence();
} else {
*snapshot =
static_cast_with_check<const SnapshotImpl>(read_options.snapshot)
Expand Down Expand Up @@ -3170,6 +3179,48 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
}
#endif // ROCKSDB_LITE

std::pair<Status, std::shared_ptr<const Snapshot>>
DBImpl::CreateTimestampedSnapshot(SequenceNumber snapshot_seq, uint64_t ts) {
assert(ts != std::numeric_limits<uint64_t>::max());

auto ret = CreateTimestampedSnapshotImpl(snapshot_seq, ts, /*lock=*/true);
return ret;
}

std::shared_ptr<const SnapshotImpl> DBImpl::GetTimestampedSnapshot(
uint64_t ts) const {
InstrumentedMutexLock lock_guard(&mutex_);
return timestamped_snapshots_.GetSnapshot(ts);
}

void DBImpl::ReleaseTimestampedSnapshotsOlderThan(uint64_t ts,
size_t* remaining_total_ss) {
autovector<std::shared_ptr<const SnapshotImpl>> snapshots_to_release;
{
InstrumentedMutexLock lock_guard(&mutex_);
timestamped_snapshots_.ReleaseSnapshotsOlderThan(ts, snapshots_to_release);
}
snapshots_to_release.clear();

if (remaining_total_ss) {
InstrumentedMutexLock lock_guard(&mutex_);
*remaining_total_ss = static_cast<size_t>(snapshots_.count());
}
}

Status DBImpl::GetTimestampedSnapshots(
uint64_t ts_lb, uint64_t ts_ub,
std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
if (ts_lb >= ts_ub) {
return Status::InvalidArgument(
"timestamp lower bound must be smaller than upper bound");
}
timestamped_snapshots.clear();
InstrumentedMutexLock lock_guard(&mutex_);
timestamped_snapshots_.GetSnapshots(ts_lb, ts_ub, timestamped_snapshots);
return Status::OK();
}

SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
Expand All @@ -3179,6 +3230,8 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,

if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
Expand All @@ -3188,9 +3241,7 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
delete s;
return nullptr;
}
auto snapshot_seq = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
if (lock) {
Expand All @@ -3199,6 +3250,115 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
return snapshot;
}

std::pair<Status, std::shared_ptr<const SnapshotImpl>>
DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;

const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber);

if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(
Status::NotSupported("Memtable does not support snapshot"), nullptr);
}

// Caller is not write thread, thus didn't provide a valid snapshot_seq.
// Obtain seq from db.
if (!need_update_seq) {
snapshot_seq = GetLastPublishedSequence();
}

std::shared_ptr<const SnapshotImpl> latest =
timestamped_snapshots_.GetSnapshot(std::numeric_limits<uint64_t>::max());

// If there is already a latest timestamped snapshot, then we need to do some
// checks.
if (latest) {
uint64_t latest_snap_ts = latest->GetTimestamp();
SequenceNumber latest_snap_seq = latest->GetSequenceNumber();
assert(latest_snap_seq <= snapshot_seq);
bool needs_create_snap = true;
Status status;
std::shared_ptr<const SnapshotImpl> ret;
if (latest_snap_ts > ts) {
// A snapshot created later cannot have smaller timestamp than a previous
// timestamped snapshot.
needs_create_snap = false;
std::ostringstream oss;
oss << "snapshot exists with larger timestamp " << latest_snap_ts << " > "
<< ts;
status = Status::InvalidArgument(oss.str());
} else if (latest_snap_ts == ts) {
if (latest_snap_seq == snapshot_seq) {
// We are requesting the same sequence number and timestamp, thus can
// safely reuse (share) the current latest timestamped snapshot.
needs_create_snap = false;
ret = latest;
} else if (latest_snap_seq < snapshot_seq) {
// There may have been writes to the database since the latest
// timestamped snapshot, yet we are still requesting the same
// timestamp. In this case, we cannot create the new timestamped
// snapshot.
needs_create_snap = false;
std::ostringstream oss;
oss << "Allocated seq is " << snapshot_seq
<< ", while snapshot exists with smaller seq " << latest_snap_seq
<< " but same timestamp " << ts;
status = Status::InvalidArgument(oss.str());
}
}
if (!needs_create_snap) {
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(status, ret);
} else {
status.PermitUncheckedError();
}
}

SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time,
/*is_write_conflict_boundary=*/true, ts);

std::shared_ptr<const SnapshotImpl> ret(
snapshot,
std::bind(&DBImpl::ReleaseSnapshot, this, std::placeholders::_1));
timestamped_snapshots_.AddSnapshot(ret);

// Caller is from write thread, and we need to update database's sequence
// number.
if (need_update_seq) {
assert(versions_);
if (last_seq_same_as_publish_seq_) {
versions_->SetLastSequence(snapshot_seq);
} else {
// TODO: support write-prepared/write-unprepared transactions with two
// write queues.
assert(false);
}
}

if (lock) {
mutex_.Unlock();
}
return std::make_pair(Status::OK(), ret);
}

namespace {
using CfdList = autovector<ColumnFamilyData*, 2>;
bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
Expand All @@ -3224,11 +3384,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
snapshots_.Delete(casted_s);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
if (last_seq_same_as_publish_seq_) {
oldest_snapshot = versions_->LastSequence();
} else {
oldest_snapshot = versions_->LastPublishedSequence();
}
oldest_snapshot = GetLastPublishedSequence();
} else {
oldest_snapshot = snapshots_.oldest()->number_;
}
Expand Down Expand Up @@ -4119,13 +4275,14 @@ Status DBImpl::Close() {
if (closed_) {
return closing_status_;
}

{
InstrumentedMutexLock l(&mutex_);
// If there is unreleased snapshot, fail the close call
if (!snapshots_.empty()) {
return Status::Aborted("Cannot close DB with unreleased snapshot.");
const Status s = MaybeReleaseTimestampedSnapshotsAndCheck();
if (!s.ok()) {
return s;
}
}

closing_status_ = CloseImpl();
closed_ = true;
return closing_status_;
Expand Down
Loading

0 comments on commit 1777e5f

Please sign in to comment.