Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback other pending memtable flushes when a flush fails #11865

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3193,6 +3193,192 @@ INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,

INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());

TEST_F(DBFlushTest, NonAtomicFlushRollbackPendingFlushes) {
// Fix a bug in when atomic_flush=false.
// The bug can happen as follows:
// Start Flush0 for memtable M0 to SST0
// Start Flush1 for memtable M1 to SST1
// Flush1 returns OK, but don't install to MANIFEST and let whoever flushes
// M0 to take care of it
// Flush0 finishes with a retryable IOError
// - It rollbacks M0, (incorrectly) not M1
// - Deletes SST1 and SST2
//
// Auto-recovery will start Flush2 for M0, it does not pick up M1 since it
// thinks that M1 is flushed
// Flush2 writes SST3 and finishes OK, tries to install SST3 and SST2
// Error opening SST2 since it's already deleted
//
// The fix is to let Flush0 also rollback M1.
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let mem1 flush start");
TEST_SYNC_POINT("Wait for mem1 flush to finish");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let mem1 flush start", "Mem1 flush starts"},
{"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
// Need first flush to wait for the second flush to finish
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush mem0
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Mem1 flush starts");
// trigger bg flush mem1
ASSERT_OK(Put(Key(3), "val3"));

TEST_SYNC_POINT("Wait for error recover");
ASSERT_EQ(1, NumTableFilesAtLevel(0));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable SyncPoint processing and callbacks?


TEST_F(DBFlushTest, AbortNonAtomicFlushWhenBGError) {
// Fix a bug in when atomic_flush=false.
// The bug can happen as follows:
// Start Flush0 for memtable M0 to SST0
// Start Flush1 for memtable M1 to SST1
// Flush1 returns OK, but doesn't install output MANIFEST and let whoever
// flushes M0 to take care of it
// Start Flush2 for memtable M2 to SST2
// Flush0 finishes with a retryable IOError
// - It rollbacks M0 AND M1
// - Deletes SST1 and SST2
// Flush2 finishes, does not rollback M2,
// - releases the pending file number that keeps SST2 alive
// - deletes SST2
//
// Then auto-recovery starts, error opening SST2 when try to install
// flush result
//
// The fix is to let Flush2 rollback M2 if it finds that
// there is a background error.
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);
std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let mem1 flush start");
TEST_SYNC_POINT("Wait for mem1 flush to finish");

TEST_SYNC_POINT("Let mem2 flush start");
TEST_SYNC_POINT("Wait for mem2 to start writing table");
}
});

SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* mems) {
autovector<MemTable*>* mems_ptr = (autovector<MemTable*>*)mems;
if ((*mems_ptr)[0]->GetID() == 3) {
TEST_SYNC_POINT("Mem2 flush starts writing table");
TEST_SYNC_POINT("Mem2 flush waits until rollback");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let mem1 flush start", "Mem1 flush starts"},
{"DBImpl::BGWorkFlush:done", "Wait for mem1 flush to finish"},
{"Let mem2 flush start", "Mem2 flush starts"},
{"Mem2 flush starts writing table",
"Wait for mem2 to start writing table"},
{"RollbackMemtableFlush", "Mem2 flush waits until rollback"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush mem0
ASSERT_OK(Put(Key(2), "val2"));
TEST_SYNC_POINT("Mem1 flush starts");
// trigger bg flush mem1
ASSERT_OK(Put(Key(3), "val3"));

TEST_SYNC_POINT("Mem2 flush starts");
ASSERT_OK(Put(Key(4), "val4"));

TEST_SYNC_POINT("Wait for error recover");
// Recovery flush writes 3 memtables together into 1 file.
ASSERT_EQ(1, NumTableFilesAtLevel(0));
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) {
Options opts = CurrentOptions();
opts.atomic_flush = false;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 1;
env_->SetBackgroundThreads(2, Env::HIGH);
DestroyAndReopen(opts);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
std::atomic_int flush_write_table_count = 0;
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_write_table_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
}
});

SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});

ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush0 for mem0
ASSERT_OK(Put(Key(2), "val2"));
dbfull()->TEST_WaitForFlushMemTable().PermitUncheckedError();

// trigger bg flush1 for mem1, should see bg error and abort
// before picking a memtable to flush
ASSERT_OK(Put(Key(3), "val3"));

TEST_SYNC_POINT("Wait for error recover");
// Recovery flush writes 2 memtables together into 1 file.
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// 1 for flush 0 and 1 for recovery flush
ASSERT_EQ(2, flush_write_table_count);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
24 changes: 20 additions & 4 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,20 @@ Status DBImpl::FlushMemTableToOutputFile(
// If the log sync failed, we do not need to pick memtable. Otherwise,
// num_flush_not_started_ needs to be rollback.
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
// Exit a flush due to bg error should not set bg error again.
bool skip_set_bg_error = false;
if (s.ok() && flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
!error_handler_.GetBGError().ok()) {
// Error recovery in progress, should not pick memtable which excludes
// them from being picked up by recovery flush.
// This ensures that when bg error is set, no new flush can pick
// memtables.
skip_set_bg_error = true;
s = error_handler_.GetBGError();
assert(!s.ok());
}

if (s.ok()) {
flush_job.PickMemTable();
need_cancel = true;
Expand All @@ -304,7 +318,8 @@ Status DBImpl::FlushMemTableToOutputFile(
// is unlocked by the current thread.
if (s.ok()) {
s = flush_job.Run(&logs_with_prep_tracker_, &file_meta,
&switched_to_mempurge);
&switched_to_mempurge, &skip_set_bg_error,
&error_handler_);
need_cancel = false;
}

Expand Down Expand Up @@ -345,7 +360,8 @@ Status DBImpl::FlushMemTableToOutputFile(
}
}

if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
!skip_set_bg_error) {
if (log_io_s.ok()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
Expand Down Expand Up @@ -634,8 +650,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].second.ok() && exec_status[i].first) {
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
cfds[i]->imm()->RollbackMemtableFlush(
mems, /*rollback_succeeding_memtables=*/false);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions db/error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ const Status& ErrorHandler::StartRecoverFromRetryableBGIOError(
}

recovery_in_prog_ = true;
TEST_SYNC_POINT("StartRecoverFromRetryableBGIOError::in_progress");
recovery_thread_.reset(
new port::Thread(&ErrorHandler::RecoverFromRetryableBGIOError, this));

Expand Down
2 changes: 2 additions & 0 deletions db/event_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ void EventHelpers::NotifyOnErrorRecoveryEnd(
info.new_bg_error.PermitUncheckedError();
}
db_mutex->Lock();
} else {
old_bg_error.PermitUncheckedError();
}
}

Expand Down
34 changes: 25 additions & 9 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ void FlushJob::PickMemTable() {
}

Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
bool* switched_to_mempurge, bool* skipped_since_bg_error,
ErrorHandler* error_handler) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
assert(pick_memtable_called);
Expand Down Expand Up @@ -303,17 +304,31 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
}

if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
cfd_->imm()->RollbackMemtableFlush(
mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
} else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_, &committed_flush_jobs_info_,
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
assert(!db_options_.atomic_flush);
if (!db_options_.atomic_flush &&
flush_reason_ != FlushReason::kErrorRecovery &&
flush_reason_ != FlushReason::kErrorRecoveryRetryFlush &&
error_handler && !error_handler->GetBGError().ok()) {
cfd_->imm()->RollbackMemtableFlush(
mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
s = error_handler->GetBGError();
if (skipped_since_bg_error) {
*skipped_since_bg_error = true;
}
} else {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_, &committed_flush_jobs_info_,
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
but 'false' if mempurge successful: no new min log number
or new level 0 file path to write to manifest. */);
}
}

if (s.ok() && file_meta != nullptr) {
Expand Down Expand Up @@ -965,6 +980,7 @@ Status FlushJob::WriteLevel0Table() {
&table_properties_, write_hint, full_history_ts_low,
blob_callback_, base_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s);
// TODO: Cleanup io_status in BuildTable and table builders
assert(!s.ok() || io_s.ok());
io_s.PermitUncheckedError();
Expand Down
7 changes: 6 additions & 1 deletion db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ class FlushJob {
// Require db_mutex held.
// Once PickMemTable() is called, either Run() or Cancel() has to be called.
void PickMemTable();
// @param skip_since_bg_error If not nullptr and if atomic_flush=false,
// then it is set to true if flush installation is skipped and memtable
// is rolled back due to existing background error.
Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
FileMetaData* file_meta = nullptr,
bool* switched_to_mempurge = nullptr);
bool* switched_to_mempurge = nullptr,
bool* skipped_since_bg_error = nullptr,
ErrorHandler* error_handler = nullptr);
void Cancel();
const autovector<MemTable*>& GetMemTables() const { return mems_; }

Expand Down
49 changes: 41 additions & 8 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -434,21 +434,54 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
}

void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t /*file_number*/) {
bool rollback_succeeding_memtables) {
TEST_SYNC_POINT("RollbackMemtableFlush");
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
assert(!mems.empty());

// If the flush was not successful, then just reset state.
// Maybe a succeeding attempt to flush will be successful.
#ifndef NDEBUG
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
}
#endif

if (rollback_succeeding_memtables && !mems.empty()) {
std::list<MemTable*>& memlist = current_->memlist_;
auto it = memlist.rbegin();
for (; *it != mems[0] && it != memlist.rend(); ++it) {
}
// mems should be in memlist
assert(*it == mems[0]);
if (*it == mems[0]) {
++it;
}
while (it != memlist.rend()) {
MemTable* m = *it;
// Only rollback complete, not in-progress,
// in_progress can be flushes that are still writing SSTs
if (m->flush_completed_) {
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
m->file_number_ = 0;
num_flush_not_started_++;
++it;
} else {
break;
}
}
}

m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
for (MemTable* m : mems) {
if (m->flush_in_progress_) {
assert(m->file_number_ == 0);
m->file_number_ = 0;
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
}
imm_flush_needed.store(true, std::memory_order_release);
}
Expand Down
Loading