Skip to content

Commit

Permalink
Quarantine files in a limbo state after a manifest error (#12030)
Browse files Browse the repository at this point in the history
Summary:
Part of the procedures to handle manifest IO error is to disable file deletion in case some files in limbo state get deleted prematurely. This is not ideal because: 1) not all the VersionEdits whose commit encounter such an error contain updates for files, disabling file deletion sometimes are not necessary. 2) `EnableFileDeletion` has a force mode that could make other threads accidentally disrupt this procedure in recovery.  3) Disabling file deletion as a whole is also not as efficient as more precisely tracking impacted files from being prematurely deleted.  This PR replaces this mechanism with tracking such files and quarantine them from being deleted in `ErrorHandler`.

These are the types of files being actively tracked in quarantine in this PR:
1) new table files and blob files from a background job
2) old manifest file whose immediately following new manifest file's CURRENT file creation gets into unclear state. Current handling is not sufficient to make sure the old manifest file is kept in case it's needed.

Note that WAL logs are not part of the quarantine because `min_log_number_to_keep` is a safe mechanism and it's only updated after successful manifest commits so it can prevent this premature deletion issue from happening.

We track these files' file numbers because they share the same file number space.

Pull Request resolved: #12030

Test Plan: Modified existing unit tests

Reviewed By: ajkr

Differential Revision: D51036774

Pulled By: jowlyzhang

fbshipit-source-id: 84ef26271fbbc888ef70da5c40fe843bd7038716
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Nov 11, 2023
1 parent 0ffc0c7 commit 509947c
Show file tree
Hide file tree
Showing 22 changed files with 201 additions and 79 deletions.
8 changes: 5 additions & 3 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ class CompactionJobTestBase : public testing::Test {
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "",
/*daily_offpeak_time_utc*/ "")),
/*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
/*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_),
Expand Down Expand Up @@ -545,7 +546,8 @@ class CompactionJobTestBase : public testing::Test {
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ ""));
/*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"",
/*error_handler=*/nullptr));
compaction_job_stats_.Reset();
ASSERT_OK(SetIdentityFile(env_, dbname_));

Expand Down
7 changes: 7 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4415,6 +4415,8 @@ TEST_F(DBBasicTest, ManifestWriteFailure) {
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
options.enable_blob_files = true;
options.blob_file_size = 0;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
Expand All @@ -4435,6 +4437,11 @@ TEST_F(DBBasicTest, ManifestWriteFailure) {
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
// The IO error was a mocked one from the `AfterSyncManifest` callback. The
// Flush's VersionEdit actually made it into the Manifest. So these keys can
// be read back. Read them to check all live sst files and blob files.
ASSERT_EQ("bar", Get("foo"));
ASSERT_EQ("value", Get("key"));
}

TEST_F(DBBasicTest, DestroyDefaultCfHandle) {
Expand Down
7 changes: 3 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
versions_.reset(new VersionSet(
dbname_, &immutable_db_options_, file_options_, table_cache_.get(),
write_buffer_manager_, &write_controller_, &block_cache_tracer_,
io_tracer_, db_id_, db_session_id_, options.daily_offpeak_time_utc));
io_tracer_, db_id_, db_session_id_, options.daily_offpeak_time_utc,
&error_handler_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));

Expand Down Expand Up @@ -359,10 +360,8 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
if (io_s.IsIOError()) {
// If resuming from IOError resulted from MANIFEST write, then assert
// that we must have already set the MANIFEST writer to nullptr during
// clean-up phase MANIFEST writing. We must have also disabled file
// deletions.
// clean-up phase MANIFEST writing.
assert(!versions_->descriptor_log_);
assert(!IsFileDeletionsEnabled());
// Since we are trying to recover from MANIFEST write error, we need to
// switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
// Therefore, force writing a dummy version edit because we do not know
Expand Down
5 changes: 1 addition & 4 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,7 @@ class DBImpl : public DB {
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForPeriodicTaskRun(std::function<void()> callback) const;
SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const;
const autovector<uint64_t>& TEST_GetFilesToQuarantine() const;
size_t TEST_EstimateInMemoryStatsHistorySize() const;

uint64_t TEST_GetCurrentLogNumber() const {
Expand Down Expand Up @@ -2380,10 +2381,6 @@ class DBImpl : public DB {

Status DisableFileDeletionsWithLock();

// Safely decrease `disable_delete_obsolete_files_` by one while holding lock
// and return its remaning value.
int EnableFileDeletionsWithLock();

Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
std::string ts_low);

Expand Down
4 changes: 4 additions & 0 deletions db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ SeqnoToTimeMapping DBImpl::TEST_GetSeqnoToTimeMapping() const {
return seqno_to_time_mapping_;
}

const autovector<uint64_t>& DBImpl::TEST_GetFilesToQuarantine() const {
InstrumentedMutexLock l(&mutex_);
return error_handler_.GetFilesToQuarantine();
}

size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
Expand Down
15 changes: 7 additions & 8 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,6 @@ Status DBImpl::EnableFileDeletions(bool force) {
return Status::OK();
}

int DBImpl::EnableFileDeletionsWithLock() {
mutex_.AssertHeld();
// In case others have called EnableFileDeletions(true /* force */) in between
disable_delete_obsolete_files_ =
std::max(0, disable_delete_obsolete_files_ - 1);
return disable_delete_obsolete_files_;
}

bool DBImpl::IsFileDeletionsEnabled() const {
return 0 == disable_delete_obsolete_files_;
}
Expand Down Expand Up @@ -154,6 +146,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// mutex_ cannot be released. Otherwise, we might see no min_pending_output
// here but later find newer generated unfinalized files while scanning.
job_context->min_pending_output = MinObsoleteSstNumberToKeep();
job_context->files_to_quarantine = error_handler_.GetFilesToQuarantine();

// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
Expand Down Expand Up @@ -427,6 +420,8 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
state.blob_live.end());
std::unordered_set<uint64_t> log_recycle_files_set(
state.log_recycle_files.begin(), state.log_recycle_files.end());
std::unordered_set<uint64_t> quarantine_files_set(
state.files_to_quarantine.begin(), state.files_to_quarantine.end());

auto candidate_files = state.full_scan_candidate_files;
candidate_files.reserve(
Expand Down Expand Up @@ -530,6 +525,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
continue;
}

if (quarantine_files_set.find(number) != quarantine_files_set.end()) {
continue;
}

bool keep = true;
switch (type) {
case kWalFile:
Expand Down
5 changes: 3 additions & 2 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1541,8 +1541,9 @@ class RecoveryTestHelper {
test->dbname_, &db_options, file_options, table_cache.get(),
&write_buffer_manager, &write_controller,
/*block_cache_tracer=*/nullptr,
/*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "",
options.daily_offpeak_time_utc));
/*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"",
options.daily_offpeak_time_utc,
/*error_handler=*/nullptr));

wal_manager.reset(
new WalManager(db_options, file_options, /*io_tracer=*/nullptr));
Expand Down
47 changes: 26 additions & 21 deletions db/error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,6 @@ const Status& ErrorHandler::SetBGError(const Status& bg_status,
ROCKS_LOG_WARN(db_options_.info_log, "Background IO error %s",
bg_io_err.ToString().c_str());

if (!recovery_disabled_file_deletion_ &&
(BackgroundErrorReason::kManifestWrite == reason ||
BackgroundErrorReason::kManifestWriteNoWAL == reason)) {
// Always returns ok
ROCKS_LOG_INFO(db_options_.info_log, "Disabling File Deletions");
db_->DisableFileDeletionsWithLock().PermitUncheckedError();
recovery_disabled_file_deletion_ = true;
}

Status new_bg_io_err = bg_io_err;
DBRecoverContext context;
if (bg_io_err.GetScope() != IOStatus::IOErrorScope::kIOErrorScopeFile &&
Expand Down Expand Up @@ -505,6 +496,31 @@ const Status& ErrorHandler::SetBGError(const Status& bg_status,
}
}

void ErrorHandler::AddFilesToQuarantine(
autovector<const autovector<uint64_t>*> files_to_quarantine) {
db_mutex_->AssertHeld();
std::ostringstream quarantine_files_oss;
bool is_first_one = true;
for (const auto* files : files_to_quarantine) {
assert(files);
for (uint64_t file_number : *files) {
files_to_quarantine_.push_back(file_number);
quarantine_files_oss << (is_first_one ? "" : ", ") << file_number;
is_first_one = false;
}
}
ROCKS_LOG_INFO(db_options_.info_log,
"ErrorHandler: added file numbers %s to quarantine.\n",
quarantine_files_oss.str().c_str());
}

void ErrorHandler::ClearFilesToQuarantine() {
db_mutex_->AssertHeld();
files_to_quarantine_.clear();
ROCKS_LOG_INFO(db_options_.info_log,
"ErrorHandler: cleared files in quarantine.\n");
}

Status ErrorHandler::OverrideNoSpaceError(const Status& bg_error,
bool* auto_recovery) {
if (bg_error.severity() >= Status::Severity::kFatalError) {
Expand Down Expand Up @@ -552,6 +568,7 @@ Status ErrorHandler::ClearBGError() {

// Signal that recovery succeeded
if (recovery_error_.ok()) {
assert(files_to_quarantine_.empty());
Status old_bg_error = bg_error_;
// old_bg_error is only for notifying listeners, so may not be checked
old_bg_error.PermitUncheckedError();
Expand All @@ -563,18 +580,6 @@ Status ErrorHandler::ClearBGError() {
recovery_error_.PermitUncheckedError();
recovery_in_prog_ = false;
soft_error_no_bg_work_ = false;
if (recovery_disabled_file_deletion_) {
recovery_disabled_file_deletion_ = false;
int remain_counter = db_->EnableFileDeletionsWithLock();
if (remain_counter == 0) {
ROCKS_LOG_INFO(db_options_.info_log, "File Deletions Enabled");
} else {
ROCKS_LOG_WARN(
db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
remain_counter);
}
}
EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, old_bg_error,
bg_error_, db_mutex_);
}
Expand Down
25 changes: 20 additions & 5 deletions db/error_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
// (found in the LICENSE.Apache file in the root directory).
#pragma once

#include <sstream>

#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
#include "rocksdb/io_status.h"
#include "rocksdb/listener.h"
#include "rocksdb/status.h"
#include "util/autovector.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -42,8 +45,7 @@ class ErrorHandler {
recovery_in_prog_(false),
soft_error_no_bg_work_(false),
is_db_stopped_(false),
bg_error_stats_(db_options.statistics),
recovery_disabled_file_deletion_(false) {
bg_error_stats_(db_options.statistics) {
// Clear the checked flag for uninitialized errors
bg_error_.PermitUncheckedError();
recovery_error_.PermitUncheckedError();
Expand Down Expand Up @@ -81,6 +83,16 @@ class ErrorHandler {

void EndAutoRecovery();

void AddFilesToQuarantine(
autovector<const autovector<uint64_t>*> files_to_quarantine);

const autovector<uint64_t>& GetFilesToQuarantine() const {
db_mutex_->AssertHeld();
return files_to_quarantine_;
}

void ClearFilesToQuarantine();

private:
DBImpl* db_;
const ImmutableDBOptions& db_options_;
Expand Down Expand Up @@ -109,9 +121,12 @@ class ErrorHandler {
// The pointer of DB statistics.
std::shared_ptr<Statistics> bg_error_stats_;

// Tracks whether the recovery has disabled file deletion. This boolean flag
// is updated while holding db mutex.
bool recovery_disabled_file_deletion_;
// During recovery from manifest IO errors, files whose VersionEdits entries
// could be in an ambiguous state are quarantined and file deletion refrain
// from deleting them. Successful recovery will clear this vector. Files are
// added to this vector while DB mutex was locked, this data structure is
// unsorted.
autovector<uint64_t> files_to_quarantine_;

const Status& HandleKnownErrors(const Status& bg_err,
BackgroundErrorReason reason);
Expand Down
Loading

0 comments on commit 509947c

Please sign in to comment.