Skip to content

Commit

Permalink
Revise LockWAL/UnlockWAL implementation (#11020)
Browse files Browse the repository at this point in the history
Summary:
RocksDB has two public APIs: `DB::LockWAL()`/`DB::UnlockWAL()`. The current implementation acquires and
releases the internal `DBImpl::log_write_mutex_`.

According to the comment on `DBImpl::log_write_mutex_`: https://github.com/facebook/rocksdb/blob/7.8.fb/db/db_impl/db_impl.h#L2287:L2288
> Note: to avoid dealock, if needed to acquire both log_write_mutex_ and mutex_, the order should be first mutex_ and then log_write_mutex_.

This puts limitations on how applications can use the `LockWAL()` API. After `LockWAL()` returns ok, then application
should not perform any operation that acquires `mutex_`. Currently, the use case of `LockWAL()` is MyRocks implementing
the MySQL storage engine handlerton `lock_hton_log` interface. The operation that MyRocks performs after `LockWAL()`
is `GetSortedWalFiless()` which not only acquires mutex_, but also `log_write_mutex_`.

There are two issues:
1. Applications using these two APIs may hang if one thread calls `GetSortedWalFiles()` after
calling `LockWAL()` because log_write_mutex is not recursive.
2. Two threads may dead lock due to lock order inversion.

To fix these issues, we can modify the implementation of LockWAL so that it does not keep
`log_write_mutex_` held until UnlockWAL. To achieve the goal of locking the WAL, we can
instead manually inject a write stall so that all future writes will be stopped.

Pull Request resolved: facebook/rocksdb#11020

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D41785203

Pulled By: riversand963

fbshipit-source-id: 5ccb7a9c6eb9a2c3fa80fd2c399cc2568b8f89ce
  • Loading branch information
riversand963 authored and facebook-github-bot committed Dec 14, 2022
1 parent 98d5db5 commit c93ba7d
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 13 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Fixed a bug caused by `DB::SyncWAL()` affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#10892).
* Fixed a BackupEngine bug in which RestoreDBFromLatestBackup would fail if the latest backup was deleted and there is another valid backup available.
* Fix L0 file misorder corruption caused by ingesting files of overlapping seqnos with memtable entries' through introducing `epoch_number`. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. Also replace the previous incomplete fix (#5958) to the same corruption with this new and more complete fix.
* Fixed a bug in LockWAL() leading to re-locking mutex (#11020).

## 7.9.0 (11/21/2022)
### Performance Improvements
Expand Down
32 changes: 21 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1570,21 +1570,31 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
}

Status DBImpl::LockWAL() {
log_write_mutex_.Lock();
auto cur_log_writer = logs_.back().writer;
IOStatus status = cur_log_writer->WriteBuffer();
if (!status.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
status.ToString().c_str());
// In case there is a fs error we should set it globally to prevent the
// future writes
WriteStatusCheck(status);
{
InstrumentedMutexLock lock(&mutex_);
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

lock_wal_write_token_ = write_controller_.GetStopToken();

if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
}
return static_cast<Status>(status);
return FlushWAL(/*sync=*/false);
}

Status DBImpl::UnlockWAL() {
log_write_mutex_.Unlock();
{
InstrumentedMutexLock lock(&mutex_);
lock_wal_write_token_.reset();
}
bg_cv_.SignalAll();
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2680,6 +2680,10 @@ class DBImpl : public DB {
// seqno_time_mapping_ stores the sequence number to time mapping, it's not
// thread safe, both read and write need db mutex hold.
SeqnoToTimeMapping seqno_time_mapping_;

// stop write token that is acquired when LockWal() is called. Destructed
// when UnlockWal() is called.
std::unique_ptr<WriteControllerToken> lock_wal_write_token_;
};

class GetWithTimestampReadCallback : public ReadCallback {
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,15 @@ Status DBImpl::WriteImplWALOnly(
write_thread->ExitAsBatchGroupLeader(write_group, status);
return status;
}
} else {
InstrumentedMutexLock lock(&mutex_);
Status status = DelayWrite(/*num_bytes=*/0ull, write_options);
if (!status.ok()) {
WriteThread::WriteGroup write_group;
write_thread->EnterAsBatchGroupLeader(&w, &write_group);
write_thread->ExitAsBatchGroupLeader(write_group, status);
return status;
}
}

WriteThread::WriteGroup write_group;
Expand Down Expand Up @@ -1762,6 +1771,7 @@ uint64_t DBImpl::GetMaxTotalWalSize() const {
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::DelayWrite(uint64_t num_bytes,
const WriteOptions& write_options) {
mutex_.AssertHeld();
uint64_t time_delayed = 0;
bool delayed = false;
{
Expand Down
46 changes: 46 additions & 0 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,52 @@ TEST_F(DBWALTest, WALWithChecksumHandoff) {
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
}

#ifndef ROCKSDB_LITE
TEST_F(DBWALTest, LockWal) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBWALTest::LockWal:AfterGetSortedWal",
"DBWALTest::LockWal:BeforeFlush:1"}});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put("foo", "v"));
ASSERT_OK(Put("bar", "v"));
port::Thread worker([&]() {
TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1");
Status tmp_s = db_->Flush(FlushOptions());
ASSERT_OK(tmp_s);
});

ASSERT_OK(db_->LockWAL());
// Verify writes are stopped
WriteOptions wopts;
wopts.no_slowdown = true;
Status s = db_->Put(wopts, "foo", "dontcare");
ASSERT_TRUE(s.IsIncomplete());
{
VectorLogPtr wals;
ASSERT_OK(db_->GetSortedWalFiles(wals));
ASSERT_FALSE(wals.empty());
}
TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal");
FlushOptions flush_opts;
flush_opts.wait = false;
s = db_->Flush(flush_opts);
ASSERT_TRUE(s.IsTryAgain());
ASSERT_OK(db_->UnlockWAL());
ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));

worker.join();

SyncPoint::GetInstance()->DisableProcessing();
} while (ChangeWalOptions());
}
#endif //! ROCKSDB_LITE

class DBRecoveryTestBlobError
: public DBWALTest,
public testing::WithParamInterface<std::string> {
Expand Down
7 changes: 5 additions & 2 deletions db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,11 @@ void WriteThread::EndWriteStall() {
// Unlink write_stall_dummy_ from the write queue. This will unblock
// pending write threads to enqueue themselves
assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
assert(write_stall_dummy_.link_older != nullptr);
write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
// write_stall_dummy_.link_older can be nullptr only if LockWAL() has been
// called.
if (write_stall_dummy_.link_older) {
write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
}
newest_writer_.exchange(write_stall_dummy_.link_older);

// Wake up writers
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1445,11 +1445,17 @@ class DB {
virtual Status SyncWAL() = 0;

// Lock the WAL. Also flushes the WAL after locking.
// After this method returns ok, writes to the database will be stopped until
// UnlockWAL() is called.
// This method may internally acquire and release DB mutex and the WAL write
// mutex, but after it returns, neither mutex is held by caller.
virtual Status LockWAL() {
return Status::NotSupported("LockWAL not implemented");
}

// Unlock the WAL.
// The write stop on the database will be cleared.
// This method may internally acquire and release DB mutex.
virtual Status UnlockWAL() {
return Status::NotSupported("UnlockWAL not implemented");
}
Expand Down
59 changes: 59 additions & 0 deletions utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6530,6 +6530,65 @@ TEST_P(TransactionTest, WriteWithBulkCreatedColumnFamilies) {
cf_handles.clear();
}

TEST_P(TransactionTest, LockWal) {
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
if (TxnDBWritePolicy::WRITE_COMMITTED != write_policy) {
ROCKSDB_GTEST_BYPASS("Test only write-committed for now");
return;
}
ASSERT_OK(ReOpen());

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"TransactionTest::LockWal:AfterLockWal",
"TransactionTest::LockWal:BeforePrepareTxn2"}});
SyncPoint::GetInstance()->EnableProcessing();

std::unique_ptr<Transaction> txn0;
WriteOptions wopts;
wopts.no_slowdown = true;
txn0.reset(db->BeginTransaction(wopts, TransactionOptions()));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Put("foo", "v0"));

std::unique_ptr<Transaction> txn1;
txn1.reset(db->BeginTransaction(wopts, TransactionOptions()));
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Put("dummy", "v0"));
ASSERT_OK(txn1->Prepare());

std::unique_ptr<Transaction> txn2;
port::Thread worker([&]() {
txn2.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Put("bar", "v0"));
TEST_SYNC_POINT("TransactionTest::LockWal:BeforePrepareTxn2");
ASSERT_OK(txn2->Prepare());
ASSERT_OK(txn2->Commit());
});
ASSERT_OK(db->LockWAL());
// txn0 cannot prepare
Status s = txn0->Prepare();
ASSERT_TRUE(s.IsIncomplete());
// txn1 cannot commit
s = txn1->Commit();
ASSERT_TRUE(s.IsIncomplete());

TEST_SYNC_POINT("TransactionTest::LockWal:AfterLockWal");

ASSERT_OK(db->UnlockWAL());
txn0.reset();

txn0.reset(db->BeginTransaction(wopts, TransactionOptions()));
ASSERT_OK(txn0->SetName("txn0_1"));
ASSERT_OK(txn0->Put("foo", "v1"));
ASSERT_OK(txn0->Prepare());
ASSERT_OK(txn0->Commit());
worker.join();

SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit c93ba7d

Please sign in to comment.