diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 102a3db59a9..6b0db5e066a 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -80,6 +80,15 @@ CompactionIterator::CompactionIterator( compaction_filter_(compaction_filter), shutting_down_(shutting_down), manual_compaction_canceled_(manual_compaction_canceled), + bottommost_level_(!compaction_ ? false + : compaction_->bottommost_level() && + !compaction_->allow_ingest_behind()), + // snapshots_ cannot be nullptr, but we will assert later in the body of + // the constructor. + visible_at_tip_(snapshots_ ? snapshots_->empty() : false), + earliest_snapshot_(!snapshots_ || snapshots_->empty() + ? kMaxSequenceNumber + : snapshots_->at(0)), info_log_(info_log), allow_data_in_errors_(allow_data_in_errors), enforce_single_del_contracts_(enforce_single_del_contracts), @@ -98,25 +107,10 @@ CompactionIterator::CompactionIterator( level_(compaction_ == nullptr ? 0 : compaction_->level()), penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) { assert(snapshots_ != nullptr); - bottommost_level_ = compaction_ == nullptr - ? false - : compaction_->bottommost_level() && - !compaction_->allow_ingest_behind(); + if (compaction_ != nullptr) { level_ptrs_ = std::vector(compaction_->number_levels(), 0); } - if (snapshots_->size() == 0) { - // optimize for fast path if there are no snapshots - visible_at_tip_ = true; - earliest_snapshot_iter_ = snapshots_->end(); - earliest_snapshot_ = kMaxSequenceNumber; - latest_snapshot_ = 0; - } else { - visible_at_tip_ = false; - earliest_snapshot_iter_ = snapshots_->begin(); - earliest_snapshot_ = snapshots_->at(0); - latest_snapshot_ = snapshots_->back(); - } #ifndef NDEBUG // findEarliestVisibleSnapshot assumes this ordering. for (size_t i = 1; i < snapshots_->size(); ++i) { @@ -173,7 +167,7 @@ void CompactionIterator::Next() { current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); key_ = current_key_.GetInternalKey(); ikey_.user_key = current_key_.GetUserKey(); - valid_ = true; + validity_info_.SetValid(ValidContext::kMerge1); } else { // We consumed all pinned merge operands, release pinned iterators pinned_iters_mgr_.ReleasePinnedData(); @@ -191,7 +185,7 @@ void CompactionIterator::Next() { NextFromInput(); } - if (valid_) { + if (Valid()) { // Record that we've outputted a record for the current key. has_outputted_key_ = true; } @@ -237,7 +231,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, if (compaction_ == nullptr) { status_ = Status::Corruption("Unexpected blob index outside of compaction"); - valid_ = false; + validity_info_.Invalidate(); return false; } @@ -252,7 +246,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Status s = blob_index.DecodeFrom(value_); if (!s.ok()) { status_ = s; - valid_ = false; + validity_info_.Invalidate(); return false; } @@ -270,7 +264,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, &bytes_read); if (!s.ok()) { status_ = s; - valid_ = false; + validity_info_.Invalidate(); return false; } @@ -294,7 +288,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // Should not reach here, since FilterV2 should never return kUndetermined. status_ = Status::NotSupported("FilterV2() should never return kUndetermined"); - valid_ = false; + validity_info_.Invalidate(); return false; } @@ -343,7 +337,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, status_ = Status::NotSupported( "Only stacked BlobDB's internal compaction filter can return " "kChangeBlobIndex."); - valid_ = false; + validity_info_.Invalidate(); return false; } if (ikey_.type == kTypeValue) { @@ -356,7 +350,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { status_ = Status::NotSupported( "CompactionFilter for integrated BlobDB should not return kIOError"); - valid_ = false; + validity_info_.Invalidate(); return false; } status_ = Status::IOError("Failed to access blob during compaction filter"); @@ -367,9 +361,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, void CompactionIterator::NextFromInput() { at_next_ = false; - valid_ = false; + validity_info_.Invalidate(); - while (!valid_ && input_.Valid() && !IsPausingManualCompaction() && + while (!Valid() && input_.Valid() && !IsPausingManualCompaction() && !IsShuttingDown()) { key_ = input_.key(); value_ = input_.value(); @@ -389,7 +383,7 @@ void CompactionIterator::NextFromInput() { has_current_user_key_ = false; current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; - valid_ = true; + validity_info_.SetValid(ValidContext::kParseKeyError); break; } TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); @@ -502,7 +496,7 @@ void CompactionIterator::NextFromInput() { if (UNLIKELY(!current_key_committed_)) { assert(snapshot_checker_ != nullptr); - valid_ = true; + validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted); break; } @@ -545,7 +539,7 @@ void CompactionIterator::NextFromInput() { } value_.clear(); - valid_ = true; + validity_info_.SetValid(ValidContext::kKeepSDAndClearPut); clear_and_output_next_key_ = false; } else if (ikey_.type == kTypeSingleDeletion) { // We can compact out a SingleDelete if: @@ -669,7 +663,7 @@ void CompactionIterator::NextFromInput() { ++iter_stats_.num_single_del_mismatch; if (enforce_single_del_contracts_) { ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str()); - valid_ = false; + validity_info_.Invalidate(); status_ = Status::Corruption(oss.str()); return; } @@ -678,7 +672,7 @@ void CompactionIterator::NextFromInput() { // We cannot drop the SingleDelete as timestamp is enabled, and // timestamp of this key is greater than or equal to // *full_history_ts_low_. We will output the SingleDelete. - valid_ = true; + validity_info_.SetValid(ValidContext::kKeepTsHistory); } else if (has_outputted_key_ || DefinitelyInSnapshot(ikey_.sequence, earliest_write_conflict_snapshot_) || @@ -713,7 +707,7 @@ void CompactionIterator::NextFromInput() { // outputted on the next iteration.) // Setting valid_ to true will output the current SingleDelete - valid_ = true; + validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck); // Set up the Put to be outputted in the next iteration. // (Optimization 3). @@ -725,7 +719,7 @@ void CompactionIterator::NextFromInput() { } else { // We hit the next snapshot without hitting a put, so the iterator // returns the single delete. - valid_ = true; + validity_info_.SetValid(ValidContext::kKeepSDForSnapshot); TEST_SYNC_POINT_CALLBACK( "CompactionIterator::NextFromInput:SingleDelete:3", const_cast(c)); @@ -758,11 +752,11 @@ void CompactionIterator::NextFromInput() { assert(bottommost_level_); } else { // Output SingleDelete - valid_ = true; + validity_info_.SetValid(ValidContext::kKeepSD); } } - if (valid_) { + if (Valid()) { at_next_ = true; } } else if (last_snapshot == current_user_key_snapshot_ || @@ -861,7 +855,7 @@ void CompactionIterator::NextFromInput() { (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) .ok()) && cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { - valid_ = true; + validity_info_.SetValid(ValidContext::kKeepDel); at_next_ = true; } } else if (ikey_.type == kTypeMerge) { @@ -905,7 +899,7 @@ void CompactionIterator::NextFromInput() { current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); key_ = current_key_.GetInternalKey(); ikey_.user_key = current_key_.GetUserKey(); - valid_ = true; + validity_info_.SetValid(ValidContext::kMerge2); } else { // all merge operands were filtered out. reset the user key, since the // batch consumed by the merge operator should not shadow any keys @@ -927,7 +921,7 @@ void CompactionIterator::NextFromInput() { ++iter_stats_.num_record_drop_range_del; AdvanceInputIter(); } else { - valid_ = true; + validity_info_.SetValid(ValidContext::kNewUserKey); } } @@ -936,7 +930,7 @@ void CompactionIterator::NextFromInput() { } } - if (!valid_ && IsShuttingDown()) { + if (!Valid() && IsShuttingDown()) { status_ = Status::ShutdownInProgress(); } @@ -955,7 +949,7 @@ bool CompactionIterator::ExtractLargeValueIfNeededImpl() { if (!s.ok()) { status_ = s; - valid_ = false; + validity_info_.Invalidate(); return false; } @@ -1000,7 +994,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { if (!s.ok()) { status_ = s; - valid_ = false; + validity_info_.Invalidate(); return; } @@ -1026,7 +1020,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { if (!s.ok()) { status_ = s; - valid_ = false; + validity_info_.Invalidate(); return; } @@ -1059,14 +1053,14 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { status_ = Status::Corruption("Corrupted blob reference encountered during GC"); - valid_ = false; + validity_info_.Invalidate(); return; } if (blob_decision == CompactionFilter::BlobDecision::kIOError) { status_ = Status::IOError("Could not relocate blob during GC"); - valid_ = false; + validity_info_.Invalidate(); return; } @@ -1126,7 +1120,7 @@ void CompactionIterator::DecideOutputLevel() { } void CompactionIterator::PrepareOutput() { - if (valid_) { + if (Valid()) { if (ikey_.type == kTypeValue) { ExtractLargeValueIfNeeded(); } else if (ikey_.type == kTypeBlobIndex) { @@ -1148,7 +1142,7 @@ void CompactionIterator::PrepareOutput() { // // Can we do the same for levels above bottom level as long as // KeyNotExistsBeyondOutputLevel() return true? - if (valid_ && compaction_ != nullptr && + if (Valid() && compaction_ != nullptr && !compaction_->allow_ingest_behind() && bottommost_level_ && DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && @@ -1162,13 +1156,14 @@ void CompactionIterator::PrepareOutput() { "earliest_snapshot %" PRIu64 ", earliest_write_conflict_snapshot %" PRIu64 " job_snapshot %" PRIu64 - ". timestamp_size: %d full_history_ts_low_ %s", + ". timestamp_size: %d full_history_ts_low_ %s. validity %x", ikey_.DebugString(allow_data_in_errors_, true).c_str(), earliest_snapshot_, earliest_write_conflict_snapshot_, job_snapshot_, static_cast(timestamp_size_), full_history_ts_low_ != nullptr ? Slice(*full_history_ts_low_).ToString(true).c_str() - : "null"); + : "null", + validity_info_.rep); assert(false); } ikey_.sequence = 0; diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index a94b0a8f3a5..c337f76e03a 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -235,7 +235,7 @@ class CompactionIterator { const Slice& value() const { return value_; } const Status& status() const { return status_; } const ParsedInternalKey& ikey() const { return ikey_; } - bool Valid() const { return valid_; } + inline bool Valid() const { return validity_info_.IsValid(); } const Slice& user_key() const { return current_user_key_; } const CompactionIterationStats& iter_stats() const { return iter_stats_; } uint64_t num_input_entry_scanned() const { return input_.num_itered(); } @@ -332,29 +332,26 @@ class CompactionIterator { // earliest visible snapshot of an older value. // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3. std::unordered_set released_snapshots_; - std::vector::const_iterator earliest_snapshot_iter_; const SequenceNumber earliest_write_conflict_snapshot_; const SequenceNumber job_snapshot_; const SnapshotChecker* const snapshot_checker_; Env* env_; SystemClock* clock_; - bool report_detailed_time_; - bool expect_valid_internal_key_; + const bool report_detailed_time_; + const bool expect_valid_internal_key_; CompactionRangeDelAggregator* range_del_agg_; BlobFileBuilder* blob_file_builder_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; const std::atomic& manual_compaction_canceled_; - bool bottommost_level_; - bool valid_ = false; - bool visible_at_tip_; - SequenceNumber earliest_snapshot_; - SequenceNumber latest_snapshot_; + const bool bottommost_level_; + const bool visible_at_tip_; + const SequenceNumber earliest_snapshot_; std::shared_ptr info_log_; - bool allow_data_in_errors_; + const bool allow_data_in_errors_; const bool enforce_single_del_contracts_; @@ -370,8 +367,33 @@ class CompactionIterator { // State // + enum ValidContext : uint8_t { + kMerge1 = 0, + kMerge2 = 1, + kParseKeyError = 2, + kCurrentKeyUncommitted = 3, + kKeepSDAndClearPut = 4, + kKeepTsHistory = 5, + kKeepSDForConflictCheck = 6, + kKeepSDForSnapshot = 7, + kKeepSD = 8, + kKeepDel = 9, + kNewUserKey = 10, + }; + + struct ValidityInfo { + inline bool IsValid() const { return rep & 1; } + ValidContext GetContext() const { + return static_cast(rep >> 1); + } + inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; } + inline void Invalidate() { rep = 0; } + + uint8_t rep{0}; + } validity_info_; + // Points to a copy of the current compaction iterator output (current_key_) - // if valid_. + // if valid. Slice key_; // Points to the value in the underlying iterator that corresponds to the // current output.