Skip to content

Commit

Permalink
Pass IOStatus to write path and set retryable IO Error as hard error …
Browse files Browse the repository at this point in the history
…in BG jobs (#6487)

Summary:
In the current code base, we use Status to get and store the returned status from the call. Specifically, for IO related functions, the current Status cannot reflect the IO Error details such as error scope, error retryable attribute, and others. With the implementation of facebook/rocksdb#5761, we have the new Wrapper for IO, which returns IOStatus instead of Status. However, the IOStatus is purged at the lower level of write path and transferred to Status.

The first job of this PR is to pass the IOStatus to the write path (flush, WAL write, and Compaction). The second job is to identify the Retryable IO Error as HardError, and set the bg_error_ as HardError. In this case, the DB Instance becomes read only. User is informed of the Status and need to take actions to deal with it (e.g., call db->Resume()).
Pull Request resolved: facebook/rocksdb#6487

Test Plan: Added the testing case to error_handler_fs_test. Pass make asan_check

Reviewed By: anand1976

Differential Revision: D20685017

Pulled By: zhichao-cao

fbshipit-source-id: ff85f042896243abcd6ef37877834e26f36b6eb0
Signed-off-by: Changlong Chen <levisonchen@live.cn>
  • Loading branch information
zhichao-cao authored and mm304321141 committed Jun 23, 2021
1 parent afbee79 commit 19a3b17
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 45 deletions.
12 changes: 6 additions & 6 deletions file/filename.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
return true;
}

Status SetCurrentFile(Env* env, const std::string& dbname,
IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
uint64_t descriptor_number,
FSDirectory* directory_to_fsync) {
// Remove leading "dbname/" and add newline to manifest file name
Expand All @@ -377,18 +377,18 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
assert(contents.starts_with(dbname + "/"));
contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number);
Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp, true);
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
if (s.ok()) {
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
s = env->RenameFile(tmp, CurrentFileName(dbname));
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
}
if (s.ok()) {
if (directory_to_fsync != nullptr) {
s = directory_to_fsync->Fsync(IOOptions(), nullptr);
}
} else {
env->DeleteFile(tmp);
fs->DeleteFile(tmp, IOOptions(), nullptr);
}
return s;
}
Expand All @@ -414,8 +414,8 @@ Status SetIdentityFile(Env* env, const std::string& dbname,
return s;
}

Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
IOStatus SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
TEST_KILL_RANDOM("SyncManifest:0", rocksdb_kill_odds * REDUCE_ODDS2);
StopWatch sw(env, db_options->statistics.get(), MANIFEST_FILE_SYNC_MICROS);
return file->Sync(db_options->use_fsync);
Expand Down
6 changes: 3 additions & 3 deletions file/filename.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number,

// Make the CURRENT file point to the descriptor file with the
// specified number.
extern Status SetCurrentFile(Env* env, const std::string& dbname,
extern IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
uint64_t descriptor_number,
FSDirectory* directory_to_fsync);

Expand All @@ -178,8 +178,8 @@ extern Status SetIdentityFile(Env* env, const std::string& dbname,
const std::string& db_id = {});

// Sync manifest file `file`.
extern Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file);
extern IOStatus SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file);

// Return list of file names of info logs in `file_names`.
// The list only contains file name. The parent directory name is stored
Expand Down
46 changes: 23 additions & 23 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
#include "util/rate_limiter.h"

namespace ROCKSDB_NAMESPACE {
Status WritableFileWriter::Append(const Slice& data) {
IOStatus WritableFileWriter::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
Status s;
IOStatus s;
pending_sync_ = true;

TEST_KILL_RANDOM("WritableFileWriter::Append:0",
Expand Down Expand Up @@ -94,7 +94,7 @@ Status WritableFileWriter::Append(const Slice& data) {
return s;
}

Status WritableFileWriter::Pad(const size_t pad_bytes) {
IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
Expand All @@ -107,7 +107,7 @@ Status WritableFileWriter::Pad(const size_t pad_bytes) {
buf_.PadWith(append_bytes, 0);
left -= append_bytes;
if (left > 0) {
Status s = Flush();
IOStatus s = Flush();
if (!s.ok()) {
return s;
}
Expand All @@ -116,12 +116,12 @@ Status WritableFileWriter::Pad(const size_t pad_bytes) {
}
pending_sync_ = true;
filesize_ += pad_bytes;
return Status::OK();
return IOStatus::OK();
}

Status WritableFileWriter::Close() {
IOStatus WritableFileWriter::Close() {
// Do not quit immediately on failure the file MUST be closed
Status s;
IOStatus s;

// Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
Expand All @@ -133,7 +133,7 @@ Status WritableFileWriter::Close() {

s = Flush(); // flush cache to OS

Status interim;
IOStatus interim;
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
Expand All @@ -160,8 +160,8 @@ Status WritableFileWriter::Close() {

// write out the cached data to the OS cache or storage if direct I/O
// enabled
Status WritableFileWriter::Flush() {
Status s;
IOStatus WritableFileWriter::Flush() {
IOStatus s;
TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
rocksdb_kill_odds * REDUCE_ODDS2);

Expand Down Expand Up @@ -224,8 +224,8 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const {
}
}

Status WritableFileWriter::Sync(bool use_fsync) {
Status s = Flush();
IOStatus WritableFileWriter::Sync(bool use_fsync) {
IOStatus s = Flush();
if (!s.ok()) {
return s;
}
Expand All @@ -238,23 +238,23 @@ Status WritableFileWriter::Sync(bool use_fsync) {
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
pending_sync_ = false;
return Status::OK();
return IOStatus::OK();
}

Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (!writable_file_->IsSyncThreadSafe()) {
return Status::NotSupported(
return IOStatus::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
"WritableFile::IsSyncThreadSafe() is false");
}
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
Status s = SyncInternal(use_fsync);
IOStatus s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
return s;
}

Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
IOStatus s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel();
Expand All @@ -268,16 +268,16 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) {
return s;
}

Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
}

// This method writes to disk the specified data and makes use of the rate
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
Status s;
IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
IOStatus s;
assert(!use_direct_io());
const char* src = data;
size_t left = size;
Expand Down Expand Up @@ -352,9 +352,9 @@ void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
// only write on aligned
// offsets.
#ifndef ROCKSDB_LITE
Status WritableFileWriter::WriteDirect() {
IOStatus WritableFileWriter::WriteDirect() {
assert(use_direct_io());
Status s;
IOStatus s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);

Expand Down
27 changes: 14 additions & 13 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "test_util/sync_point.h"
Expand All @@ -36,11 +37,11 @@ class WritableFileWriter {
void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) {
const IOStatus& io_status) {
FileOperationInfo info(file_name_, start_ts, finish_ts);
info.offset = offset;
info.length = length;
info.status = status;
info.status = io_status;

for (auto& listener : listeners_) {
listener->OnFileWriteFinish(info);
Expand Down Expand Up @@ -122,24 +123,24 @@ class WritableFileWriter {

std::string file_name() const { return file_name_; }

Status Append(const Slice& data);
IOStatus Append(const Slice& data);

Status Pad(const size_t pad_bytes);
IOStatus Pad(const size_t pad_bytes);

Status Flush();
IOStatus Flush();

Status Close();
IOStatus Close();

Status Sync(bool use_fsync);
IOStatus Sync(bool use_fsync);

// Sync only the data that was already Flush()ed. Safe to call concurrently
// with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
// returns NotSupported status.
Status SyncWithoutFlush(bool use_fsync);
IOStatus SyncWithoutFlush(bool use_fsync);

uint64_t GetFileSize() const { return filesize_; }

Status InvalidateCache(size_t offset, size_t length) {
IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}

Expand All @@ -161,11 +162,11 @@ class WritableFileWriter {
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE
Status WriteDirect();
IOStatus WriteDirect();
#endif // !ROCKSDB_LITE
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes);
Status SyncInternal(bool use_fsync);
IOStatus WriteBuffered(const char* data, size_t size);
IOStatus RangeSync(uint64_t offset, uint64_t nbytes);
IOStatus SyncInternal(bool use_fsync);
};
} // namespace ROCKSDB_NAMESPACE

0 comments on commit 19a3b17

Please sign in to comment.