Skip to content

Commit

Permalink
Fix race condition with WAL tracking and FlushWAL(true /* sync */) (f…
Browse files Browse the repository at this point in the history
…acebook#10185)

Summary:
`FlushWAL(true /* sync */)` is used internally and for manual WAL sync. It had a bug when used together with `track_and_verify_wals_in_manifest` where the synced size tracked in MANIFEST was larger than the number of bytes actually synced.

The bug could be repro'd almost immediately with the following crash test command: `python3 tools/db_crashtest.py blackbox --simple --write_buffer_size=524288 --max_bytes_for_level_base=2097152 --target_file_size_base=524288 --duration=3600 --interval=10 --sync_fault_injection=1 --disable_wal=0 --checkpoint_one_in=1000 --max_key=10000 --value_size_mult=33`.

An example error message produced by the above command is shown below. The error sometimes arose from the checkpoint and other times arose from the main stress test DB.

```
Corruption: Size mismatch: WAL (log number: 119) in MANIFEST is 27938 bytes , but actually is 27859 bytes on disk.
```

Pull Request resolved: facebook#10185

Test Plan:
- repro unit test
- the above crash test command no longer finds the error. It does find a different error after a while longer such as "Corruption: WAL file 481 required by manifest but not in directory list"

Reviewed By: riversand963

Differential Revision: D37200993

Pulled By: ajkr

fbshipit-source-id: 98e0071c1a89f4d009888512ed89f9219779ae5f
  • Loading branch information
ajkr authored and facebook-github-bot committed Jun 17, 2022
1 parent a5d773e commit d5d8920
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 18 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Fix a bug that could return wrong results with `index_type=kHashSearch` and using `SetOptions` to change the `prefix_extractor`.
* Fixed a bug in WAL tracking with wal_compression. WAL compression writes a kSetCompressionType record which is not associated with any sequence number. As result, WalManager::GetSortedWalsOfType() will skip these WALs and not return them to caller, e.g. Checkpoint, Backup, causing the operations to fail.
* Avoid a crash if the IDENTITY file is accidentally truncated to empty. A new DB ID will be written and generated on Open.
* Fixed a possible corruption for users of `manual_wal_flush` and/or `FlushWAL(true /* sync */)`, together with `track_and_verify_wals_in_manifest == true`. For those users, losing unsynced data (e.g., due to power loss) could make future DB opens fail with a `Status::Corruption` complaining about missing WAL data.

### Public API changes
* Add new API GetUnixTime in Snapshot class which returns the unix time at which Snapshot is taken.
Expand Down
20 changes: 9 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ Status DBImpl::FlushWAL(bool sync) {
}

Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
autovector<log::Writer*, 1> logs_to_sync;
bool need_log_dir_sync;
uint64_t current_log_number;
Expand All @@ -1379,7 +1380,7 @@ Status DBImpl::SyncWAL() {
current_log_number = logfile_number_;

while (logs_.front().number <= current_log_number &&
logs_.front().getting_synced) {
logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
// First check that logs are safe to sync in background.
Expand All @@ -1396,8 +1397,7 @@ Status DBImpl::SyncWAL() {
for (auto it = logs_.begin();
it != logs_.end() && it->number <= current_log_number; ++it) {
auto& log = *it;
assert(!log.getting_synced);
log.getting_synced = true;
log.PrepareForSync();
logs_to_sync.push_back(log.writer);
}

Expand Down Expand Up @@ -1470,11 +1470,10 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.getting_synced);
assert(wal.IsSyncing());
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.writer->file()->GetFileSize() > 0) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
wal.GetPreSyncSize() > 0) {
synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}

if (logs_.size() > 1) {
Expand All @@ -1483,12 +1482,12 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
wal.getting_synced = false;
wal.FinishSync();
++it;
}
}
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));
(logs_.size() == 1 && !logs_[0].IsSyncing()));

Status s;
if (synced_wals.IsWalAddition()) {
Expand All @@ -1508,8 +1507,7 @@ void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
assert(wal.getting_synced);
wal.getting_synced = false;
wal.FinishSync();
}
log_sync_cv_.SignalAll();
}
Expand Down
26 changes: 26 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1593,12 +1593,38 @@ class DBImpl : public DB {
return s;
}

bool IsSyncing() { return getting_synced; }

uint64_t GetPreSyncSize() {
assert(getting_synced);
return pre_sync_size;
}

void PrepareForSync() {
assert(!getting_synced);
// Size is expected to be monotonically increasing.
assert(writer->file()->GetFlushedSize() >= pre_sync_size);
getting_synced = true;
pre_sync_size = writer->file()->GetFlushedSize();
}

void FinishSync() {
assert(getting_synced);
getting_synced = false;
}

uint64_t number;
// Visual Studio doesn't support deque's member to be noncopyable because
// of a std::unique_ptr as a member.
log::Writer* writer; // own

private:
// true for some prefix of logs_
bool getting_synced = false;
// The size of the file before the sync happens. This amount is guaranteed
// to be persisted even if appends happen during sync so it can be used for
// tracking the synced size in MANIFEST.
uint64_t pre_sync_size = 0;
};

// PurgeFileInfo is a structure to hold information of files to be deleted in
Expand Down
5 changes: 2 additions & 3 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
logs_.front().getting_synced) {
logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
for (auto it = logs_.begin();
it != logs_.end() && it->number < current_log_number; ++it) {
auto& log = *it;
assert(!log.getting_synced);
log.getting_synced = true;
log.PrepareForSync();
logs_to_sync.push_back(log.writer);
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
while (!logs_.empty() && logs_.front().number < min_log_number) {
auto& log = logs_.front();
if (log.getting_synced) {
if (log.IsSyncing()) {
log_sync_cv_.Wait();
// logs_ could have changed while we were waiting.
continue;
Expand Down
5 changes: 2 additions & 3 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1181,17 +1181,16 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
// Note: there does not seem to be a reason to wait for parallel sync at
// this early step but it is not important since parallel sync (SyncWAL) and
// need_log_sync are usually not used together.
while (logs_.front().getting_synced) {
while (logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
for (auto& log : logs_) {
assert(!log.getting_synced);
// This is just to prevent the logs to be synced by a parallel SyncWAL
// call. We will do the actual syncing later after we will write to the
// WAL.
// Note: there does not seem to be a reason to set this early before we
// actually write to the WAL
log.getting_synced = true;
log.PrepareForSync();
}
} else {
*need_log_sync = false;
Expand Down
35 changes: 35 additions & 0 deletions db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,41 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) {
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
}

TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
// Repro race condition bug where unflushed WAL data extended the synced size
// recorded to MANIFEST despite being unrecoverable.
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
options.manual_wal_flush = true;
options.track_and_verify_wals_in_manifest = true;
Reopen(options);

ASSERT_OK(Put("key1", "val1"));

SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:Begin",
[this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->FlushWAL(true /* sync */));

// Ensure callback ran.
ASSERT_EQ("val2", Get("key2"));

Close();

// Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
// DB WAL.
fault_env->DropUnsyncedFileData();

Reopen(options);

// Need to close before `fault_env` goes out of scope.
Close();
}

TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
Expand Down
8 changes: 8 additions & 0 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ IOStatus WritableFileWriter::WriteBuffered(

left -= allowed;
src += allowed;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + allowed, std::memory_order_release);
}
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
Expand Down Expand Up @@ -675,6 +677,8 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
// the corresponding checksum value
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release);
return s;
}

Expand Down Expand Up @@ -782,6 +786,8 @@ IOStatus WritableFileWriter::WriteDirect(
left -= size;
src += size;
write_offset += size;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + size, std::memory_order_release);
assert((next_write_offset_ % alignment) == 0);
}

Expand Down Expand Up @@ -884,6 +890,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(

IOSTATS_ADD(bytes_written, left);
assert((next_write_offset_ % alignment) == 0);
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release);

if (s.ok()) {
// Move the tail to the beginning of the buffer
Expand Down
10 changes: 10 additions & 0 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class WritableFileWriter {
// Actually written data size can be used for truncate
// not counting padding data
std::atomic<uint64_t> filesize_;
std::atomic<uint64_t> flushed_size_;
#ifndef ROCKSDB_LITE
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
Expand Down Expand Up @@ -180,6 +181,7 @@ class WritableFileWriter {
buf_(),
max_buffer_size_(options.writable_file_max_buffer_size),
filesize_(0),
flushed_size_(0),
#ifndef ROCKSDB_LITE
next_write_offset_(0),
#endif // ROCKSDB_LITE
Expand Down Expand Up @@ -259,6 +261,14 @@ class WritableFileWriter {
return filesize_.load(std::memory_order_acquire);
}

// Returns the size of data flushed to the underlying `FSWritableFile`.
// Expected to match `writable_file()->GetFileSize()`.
// The return value can serve as a lower-bound for the amount of data synced
// by a future call to `SyncWithoutFlush()`.
uint64_t GetFlushedSize() const {
return flushed_size_.load(std::memory_order_acquire);
}

IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}
Expand Down
45 changes: 45 additions & 0 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,51 @@ TEST_F(CheckpointTest, CheckpointWithDbPath) {
delete checkpoint;
}

TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
// Repro for a race condition where a user write comes in after the checkpoint
// syncs WAL for `track_and_verify_wals_in_manifest` but before the
// corresponding MANIFEST update. With the bug, that scenario resulted in an
// unopenable DB with error "Corruption: Size mismatch: WAL ...".
Options options = CurrentOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
options.track_and_verify_wals_in_manifest = true;
Reopen(options);

ASSERT_OK(Put("key1", "val1"));

SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
[this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

std::unique_ptr<Checkpoint> checkpoint;
{
Checkpoint* checkpoint_ptr;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
checkpoint.reset(checkpoint_ptr);
}

ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));

// Ensure callback ran.
ASSERT_EQ("val2", Get("key2"));

Close();

// Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
// DB WAL.
fault_env->DropUnsyncedFileData();

// Before the bug fix, reopening the DB would fail because the MANIFEST's
// AddWal entry indicated the WAL should be synced through "key2" -> "val2".
Reopen(options);

// Need to close before `fault_env` goes out of scope.
Close();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit d5d8920

Please sign in to comment.