@@ -133,8 +133,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
133133 seed_(0 ),
134134 tmp_batch_(new WriteBatch),
135135 bg_compaction_scheduled_(false ),
136- manual_compaction_(NULL ),
137- consecutive_compaction_errors_(0 ) {
136+ manual_compaction_(NULL ) {
138137 mem_->Ref ();
139138 has_imm_.Release_Store (NULL );
140139
@@ -217,6 +216,12 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
217216}
218217
219218void DBImpl::DeleteObsoleteFiles () {
219+ if (!bg_error_.ok ()) {
220+ // After a background error, we don't know whether a new version may
221+ // or may not have been committed, so we cannot safely garbage collect.
222+ return ;
223+ }
224+
220225 // Make a set of all of the live files
221226 std::set<uint64_t > live = pending_outputs_;
222227 versions_->AddLiveFiles (&live);
@@ -495,7 +500,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
495500 return s;
496501}
497502
498- Status DBImpl::CompactMemTable () {
503+ void DBImpl::CompactMemTable () {
499504 mutex_.AssertHeld ();
500505 assert (imm_ != NULL );
501506
@@ -523,9 +528,9 @@ Status DBImpl::CompactMemTable() {
523528 imm_ = NULL ;
524529 has_imm_.Release_Store (NULL );
525530 DeleteObsoleteFiles ();
531+ } else {
532+ RecordBackgroundError (s);
526533 }
527-
528- return s;
529534}
530535
531536void DBImpl::CompactRange (const Slice* begin, const Slice* end) {
@@ -568,16 +573,18 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
568573 }
569574
570575 MutexLock l (&mutex_);
571- while (!manual.done ) {
572- while (manual_compaction_ != NULL ) {
573- bg_cv_.Wait ();
574- }
575- manual_compaction_ = &manual;
576- MaybeScheduleCompaction ();
577- while (manual_compaction_ == &manual) {
576+ while (!manual.done && !shutting_down_.Acquire_Load () && bg_error_.ok ()) {
577+ if (manual_compaction_ == NULL ) { // Idle
578+ manual_compaction_ = &manual;
579+ MaybeScheduleCompaction ();
580+ } else { // Running either my compaction or another compaction.
578581 bg_cv_.Wait ();
579582 }
580583 }
584+ if (manual_compaction_ == &manual) {
585+ // Cancel my manual compaction since we aborted early for some reason.
586+ manual_compaction_ = NULL ;
587+ }
581588}
582589
583590Status DBImpl::TEST_CompactMemTable () {
@@ -596,12 +603,22 @@ Status DBImpl::TEST_CompactMemTable() {
596603 return s;
597604}
598605
606+ void DBImpl::RecordBackgroundError (const Status& s) {
607+ mutex_.AssertHeld ();
608+ if (bg_error_.ok ()) {
609+ bg_error_ = s;
610+ bg_cv_.SignalAll ();
611+ }
612+ }
613+
599614void DBImpl::MaybeScheduleCompaction () {
600615 mutex_.AssertHeld ();
601616 if (bg_compaction_scheduled_) {
602617 // Already scheduled
603618 } else if (shutting_down_.Acquire_Load ()) {
604619 // DB is being deleted; no more background compactions
620+ } else if (!bg_error_.ok ()) {
621+ // Already got an error; no more changes
605622 } else if (imm_ == NULL &&
606623 manual_compaction_ == NULL &&
607624 !versions_->NeedsCompaction ()) {
@@ -619,30 +636,12 @@ void DBImpl::BGWork(void* db) {
619636void DBImpl::BackgroundCall () {
620637 MutexLock l (&mutex_);
621638 assert (bg_compaction_scheduled_);
622- if (!shutting_down_.Acquire_Load ()) {
623- Status s = BackgroundCompaction ();
624- if (s.ok ()) {
625- // Success
626- consecutive_compaction_errors_ = 0 ;
627- } else if (shutting_down_.Acquire_Load ()) {
628- // Error most likely due to shutdown; do not wait
629- } else {
630- // Wait a little bit before retrying background compaction in
631- // case this is an environmental problem and we do not want to
632- // chew up resources for failed compactions for the duration of
633- // the problem.
634- bg_cv_.SignalAll (); // In case a waiter can proceed despite the error
635- Log (options_.info_log , " Waiting after background compaction error: %s" ,
636- s.ToString ().c_str ());
637- mutex_.Unlock ();
638- ++consecutive_compaction_errors_;
639- int seconds_to_sleep = 1 ;
640- for (int i = 0 ; i < 3 && i < consecutive_compaction_errors_ - 1 ; ++i) {
641- seconds_to_sleep *= 2 ;
642- }
643- env_->SleepForMicroseconds (seconds_to_sleep * 1000000 );
644- mutex_.Lock ();
645- }
639+ if (shutting_down_.Acquire_Load ()) {
640+ // No more background work when shutting down.
641+ } else if (!bg_error_.ok ()) {
642+ // No more background work after a background error.
643+ } else {
644+ BackgroundCompaction ();
646645 }
647646
648647 bg_compaction_scheduled_ = false ;
@@ -653,11 +652,12 @@ void DBImpl::BackgroundCall() {
653652 bg_cv_.SignalAll ();
654653}
655654
656- Status DBImpl::BackgroundCompaction () {
655+ void DBImpl::BackgroundCompaction () {
657656 mutex_.AssertHeld ();
658657
659658 if (imm_ != NULL ) {
660- return CompactMemTable ();
659+ CompactMemTable ();
660+ return ;
661661 }
662662
663663 Compaction* c;
@@ -691,6 +691,9 @@ Status DBImpl::BackgroundCompaction() {
691691 c->edit ()->AddFile (c->level () + 1 , f->number , f->file_size ,
692692 f->smallest , f->largest );
693693 status = versions_->LogAndApply (c->edit (), &mutex_);
694+ if (!status.ok ()) {
695+ RecordBackgroundError (status);
696+ }
694697 VersionSet::LevelSummaryStorage tmp;
695698 Log (options_.info_log , " Moved #%lld to level-%d %lld bytes %s: %s\n " ,
696699 static_cast <unsigned long long >(f->number ),
@@ -701,6 +704,9 @@ Status DBImpl::BackgroundCompaction() {
701704 } else {
702705 CompactionState* compact = new CompactionState (c);
703706 status = DoCompactionWork (compact);
707+ if (!status.ok ()) {
708+ RecordBackgroundError (status);
709+ }
704710 CleanupCompaction (compact);
705711 c->ReleaseInputs ();
706712 DeleteObsoleteFiles ();
@@ -714,9 +720,6 @@ Status DBImpl::BackgroundCompaction() {
714720 } else {
715721 Log (options_.info_log ,
716722 " Compaction error: %s" , status.ToString ().c_str ());
717- if (options_.paranoid_checks && bg_error_.ok ()) {
718- bg_error_ = status;
719- }
720723 }
721724
722725 if (is_manual) {
@@ -732,7 +735,6 @@ Status DBImpl::BackgroundCompaction() {
732735 }
733736 manual_compaction_ = NULL ;
734737 }
735- return status;
736738}
737739
738740void DBImpl::CleanupCompaction (CompactionState* compact) {
@@ -1002,6 +1004,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
10021004 if (status.ok ()) {
10031005 status = InstallCompactionResults (compact);
10041006 }
1007+ if (!status.ok ()) {
1008+ RecordBackgroundError (status);
1009+ }
10051010 VersionSet::LevelSummaryStorage tmp;
10061011 Log (options_.info_log ,
10071012 " compacted to: %s" , versions_->LevelSummary (&tmp));
@@ -1185,13 +1190,23 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
11851190 {
11861191 mutex_.Unlock ();
11871192 status = log_->AddRecord (WriteBatchInternal::Contents (updates));
1193+ bool sync_error = false ;
11881194 if (status.ok () && options.sync ) {
11891195 status = logfile_->Sync ();
1196+ if (!status.ok ()) {
1197+ sync_error = true ;
1198+ }
11901199 }
11911200 if (status.ok ()) {
11921201 status = WriteBatchInternal::InsertInto (updates, mem_);
11931202 }
11941203 mutex_.Lock ();
1204+ if (sync_error) {
1205+ // The state of the log file is indeterminate: the log record we
1206+ // just added may or may not show up when the DB is re-opened.
1207+ // So we force the DB into a mode where all future writes fail.
1208+ RecordBackgroundError (status);
1209+ }
11951210 }
11961211 if (updates == tmp_batch_) tmp_batch_->Clear ();
11971212
0 commit comments