Skip to content

Commit

Permalink
Include minimal contextual information in CompactionIterator (#10505)
Browse files Browse the repository at this point in the history
Summary:
The main purpose is to make debugging easier without sacrificing performance.

Instead of using a boolean variable for `CompactionIterator::valid_`, we can extend it to an `uint8_t`,
using the LSB to denote if the compaction iterator is valid and 4 additional bits to denote where
the iterator is set valid inside `NextFromInput()`. Therefore, when the control flow reaches
`PrepareOutput()` and hits assertion there, we can have a better idea of what has gone wrong.

Pull Request resolved: #10505

Test Plan:
make check
```
TEST_TMPDIR=/dev/shm/rocksdb time ./db_bench -compression_type=none -write_buffer_size=1073741824 -benchmarks=fillseq,flush
```
The above command has a 'flush' benchmark which uses `CompactionIterator`.  I haven't observed any CPU regression or drop in throughput or latency increase.

Reviewed By: ltamasi

Differential Revision: D38551615

Pulled By: riversand963

fbshipit-source-id: 1250848fc118bb753d71fa9ff8ba840df999f5e0
  • Loading branch information
riversand963 authored and facebook-github-bot committed Aug 10, 2022
1 parent f060b47 commit fee2c47
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 59 deletions.
91 changes: 43 additions & 48 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ CompactionIterator::CompactionIterator(
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
manual_compaction_canceled_(manual_compaction_canceled),
bottommost_level_(!compaction_ ? false
: compaction_->bottommost_level() &&
!compaction_->allow_ingest_behind()),
// snapshots_ cannot be nullptr, but we will assert later in the body of
// the constructor.
visible_at_tip_(snapshots_ ? snapshots_->empty() : false),
earliest_snapshot_(!snapshots_ || snapshots_->empty()
? kMaxSequenceNumber
: snapshots_->at(0)),
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors),
enforce_single_del_contracts_(enforce_single_del_contracts),
Expand All @@ -98,25 +107,10 @@ CompactionIterator::CompactionIterator(
level_(compaction_ == nullptr ? 0 : compaction_->level()),
penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) {
assert(snapshots_ != nullptr);
bottommost_level_ = compaction_ == nullptr
? false
: compaction_->bottommost_level() &&
!compaction_->allow_ingest_behind();

if (compaction_ != nullptr) {
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
}
if (snapshots_->size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = true;
earliest_snapshot_iter_ = snapshots_->end();
earliest_snapshot_ = kMaxSequenceNumber;
latest_snapshot_ = 0;
} else {
visible_at_tip_ = false;
earliest_snapshot_iter_ = snapshots_->begin();
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
#ifndef NDEBUG
// findEarliestVisibleSnapshot assumes this ordering.
for (size_t i = 1; i < snapshots_->size(); ++i) {
Expand Down Expand Up @@ -173,7 +167,7 @@ void CompactionIterator::Next() {
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
validity_info_.SetValid(ValidContext::kMerge1);
} else {
// We consumed all pinned merge operands, release pinned iterators
pinned_iters_mgr_.ReleasePinnedData();
Expand All @@ -191,7 +185,7 @@ void CompactionIterator::Next() {
NextFromInput();
}

if (valid_) {
if (Valid()) {
// Record that we've outputted a record for the current key.
has_outputted_key_ = true;
}
Expand Down Expand Up @@ -237,7 +231,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
if (compaction_ == nullptr) {
status_ =
Status::Corruption("Unexpected blob index outside of compaction");
valid_ = false;
validity_info_.Invalidate();
return false;
}

Expand All @@ -252,7 +246,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Status s = blob_index.DecodeFrom(value_);
if (!s.ok()) {
status_ = s;
valid_ = false;
validity_info_.Invalidate();
return false;
}

Expand All @@ -270,7 +264,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
&bytes_read);
if (!s.ok()) {
status_ = s;
valid_ = false;
validity_info_.Invalidate();
return false;
}

Expand All @@ -294,7 +288,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
// Should not reach here, since FilterV2 should never return kUndetermined.
status_ =
Status::NotSupported("FilterV2() should never return kUndetermined");
valid_ = false;
validity_info_.Invalidate();
return false;
}

Expand Down Expand Up @@ -343,7 +337,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
status_ = Status::NotSupported(
"Only stacked BlobDB's internal compaction filter can return "
"kChangeBlobIndex.");
valid_ = false;
validity_info_.Invalidate();
return false;
}
if (ikey_.type == kTypeValue) {
Expand All @@ -356,7 +350,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) {
status_ = Status::NotSupported(
"CompactionFilter for integrated BlobDB should not return kIOError");
valid_ = false;
validity_info_.Invalidate();
return false;
}
status_ = Status::IOError("Failed to access blob during compaction filter");
Expand All @@ -367,9 +361,9 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,

void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;
validity_info_.Invalidate();

while (!valid_ && input_.Valid() && !IsPausingManualCompaction() &&
while (!Valid() && input_.Valid() && !IsPausingManualCompaction() &&
!IsShuttingDown()) {
key_ = input_.key();
value_ = input_.value();
Expand All @@ -389,7 +383,7 @@ void CompactionIterator::NextFromInput() {
has_current_user_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
valid_ = true;
validity_info_.SetValid(ValidContext::kParseKeyError);
break;
}
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
Expand Down Expand Up @@ -502,7 +496,7 @@ void CompactionIterator::NextFromInput() {

if (UNLIKELY(!current_key_committed_)) {
assert(snapshot_checker_ != nullptr);
valid_ = true;
validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted);
break;
}

Expand Down Expand Up @@ -545,7 +539,7 @@ void CompactionIterator::NextFromInput() {
}

value_.clear();
valid_ = true;
validity_info_.SetValid(ValidContext::kKeepSDAndClearPut);
clear_and_output_next_key_ = false;
} else if (ikey_.type == kTypeSingleDeletion) {
// We can compact out a SingleDelete if:
Expand Down Expand Up @@ -669,7 +663,7 @@ void CompactionIterator::NextFromInput() {
++iter_stats_.num_single_del_mismatch;
if (enforce_single_del_contracts_) {
ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str());
valid_ = false;
validity_info_.Invalidate();
status_ = Status::Corruption(oss.str());
return;
}
Expand All @@ -678,7 +672,7 @@ void CompactionIterator::NextFromInput() {
// We cannot drop the SingleDelete as timestamp is enabled, and
// timestamp of this key is greater than or equal to
// *full_history_ts_low_. We will output the SingleDelete.
valid_ = true;
validity_info_.SetValid(ValidContext::kKeepTsHistory);
} else if (has_outputted_key_ ||
DefinitelyInSnapshot(ikey_.sequence,
earliest_write_conflict_snapshot_) ||
Expand Down Expand Up @@ -713,7 +707,7 @@ void CompactionIterator::NextFromInput() {
// outputted on the next iteration.)

// Setting valid_ to true will output the current SingleDelete
valid_ = true;
validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck);

// Set up the Put to be outputted in the next iteration.
// (Optimization 3).
Expand All @@ -725,7 +719,7 @@ void CompactionIterator::NextFromInput() {
} else {
// We hit the next snapshot without hitting a put, so the iterator
// returns the single delete.
valid_ = true;
validity_info_.SetValid(ValidContext::kKeepSDForSnapshot);
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:SingleDelete:3",
const_cast<Compaction*>(c));
Expand Down Expand Up @@ -758,11 +752,11 @@ void CompactionIterator::NextFromInput() {
assert(bottommost_level_);
} else {
// Output SingleDelete
valid_ = true;
validity_info_.SetValid(ValidContext::kKeepSD);
}
}

if (valid_) {
if (Valid()) {
at_next_ = true;
}
} else if (last_snapshot == current_user_key_snapshot_ ||
Expand Down Expand Up @@ -861,7 +855,7 @@ void CompactionIterator::NextFromInput() {
(ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok()) &&
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
valid_ = true;
validity_info_.SetValid(ValidContext::kKeepDel);
at_next_ = true;
}
} else if (ikey_.type == kTypeMerge) {
Expand Down Expand Up @@ -905,7 +899,7 @@ void CompactionIterator::NextFromInput() {
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
validity_info_.SetValid(ValidContext::kMerge2);
} else {
// all merge operands were filtered out. reset the user key, since the
// batch consumed by the merge operator should not shadow any keys
Expand All @@ -927,7 +921,7 @@ void CompactionIterator::NextFromInput() {
++iter_stats_.num_record_drop_range_del;
AdvanceInputIter();
} else {
valid_ = true;
validity_info_.SetValid(ValidContext::kNewUserKey);
}
}

Expand All @@ -936,7 +930,7 @@ void CompactionIterator::NextFromInput() {
}
}

if (!valid_ && IsShuttingDown()) {
if (!Valid() && IsShuttingDown()) {
status_ = Status::ShutdownInProgress();
}

Expand All @@ -955,7 +949,7 @@ bool CompactionIterator::ExtractLargeValueIfNeededImpl() {

if (!s.ok()) {
status_ = s;
valid_ = false;
validity_info_.Invalidate();

return false;
}
Expand Down Expand Up @@ -1000,7 +994,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {

if (!s.ok()) {
status_ = s;
valid_ = false;
validity_info_.Invalidate();

return;
}
Expand All @@ -1026,7 +1020,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {

if (!s.ok()) {
status_ = s;
valid_ = false;
validity_info_.Invalidate();

return;
}
Expand Down Expand Up @@ -1059,14 +1053,14 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
status_ =
Status::Corruption("Corrupted blob reference encountered during GC");
valid_ = false;
validity_info_.Invalidate();

return;
}

if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
status_ = Status::IOError("Could not relocate blob during GC");
valid_ = false;
validity_info_.Invalidate();

return;
}
Expand Down Expand Up @@ -1126,7 +1120,7 @@ void CompactionIterator::DecideOutputLevel() {
}

void CompactionIterator::PrepareOutput() {
if (valid_) {
if (Valid()) {
if (ikey_.type == kTypeValue) {
ExtractLargeValueIfNeeded();
} else if (ikey_.type == kTypeBlobIndex) {
Expand All @@ -1148,7 +1142,7 @@ void CompactionIterator::PrepareOutput() {
//
// Can we do the same for levels above bottom level as long as
// KeyNotExistsBeyondOutputLevel() return true?
if (valid_ && compaction_ != nullptr &&
if (Valid() && compaction_ != nullptr &&
!compaction_->allow_ingest_behind() && bottommost_level_ &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ &&
Expand All @@ -1162,13 +1156,14 @@ void CompactionIterator::PrepareOutput() {
"earliest_snapshot %" PRIu64
", earliest_write_conflict_snapshot %" PRIu64
" job_snapshot %" PRIu64
". timestamp_size: %d full_history_ts_low_ %s",
". timestamp_size: %d full_history_ts_low_ %s. validity %x",
ikey_.DebugString(allow_data_in_errors_, true).c_str(),
earliest_snapshot_, earliest_write_conflict_snapshot_,
job_snapshot_, static_cast<int>(timestamp_size_),
full_history_ts_low_ != nullptr
? Slice(*full_history_ts_low_).ToString(true).c_str()
: "null");
: "null",
validity_info_.rep);
assert(false);
}
ikey_.sequence = 0;
Expand Down
44 changes: 33 additions & 11 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class CompactionIterator {
const Slice& value() const { return value_; }
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
bool Valid() const { return valid_; }
inline bool Valid() const { return validity_info_.IsValid(); }
const Slice& user_key() const { return current_user_key_; }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
Expand Down Expand Up @@ -332,29 +332,26 @@ class CompactionIterator {
// earliest visible snapshot of an older value.
// See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
std::unordered_set<SequenceNumber> released_snapshots_;
std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SequenceNumber job_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
SystemClock* clock_;
bool report_detailed_time_;
bool expect_valid_internal_key_;
const bool report_detailed_time_;
const bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;
BlobFileBuilder* blob_file_builder_;
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>& manual_compaction_canceled_;
bool bottommost_level_;
bool valid_ = false;
bool visible_at_tip_;
SequenceNumber earliest_snapshot_;
SequenceNumber latest_snapshot_;
const bool bottommost_level_;
const bool visible_at_tip_;
const SequenceNumber earliest_snapshot_;

std::shared_ptr<Logger> info_log_;

bool allow_data_in_errors_;
const bool allow_data_in_errors_;

const bool enforce_single_del_contracts_;

Expand All @@ -370,8 +367,33 @@ class CompactionIterator {

// State
//
enum ValidContext : uint8_t {
kMerge1 = 0,
kMerge2 = 1,
kParseKeyError = 2,
kCurrentKeyUncommitted = 3,
kKeepSDAndClearPut = 4,
kKeepTsHistory = 5,
kKeepSDForConflictCheck = 6,
kKeepSDForSnapshot = 7,
kKeepSD = 8,
kKeepDel = 9,
kNewUserKey = 10,
};

struct ValidityInfo {
inline bool IsValid() const { return rep & 1; }
ValidContext GetContext() const {
return static_cast<ValidContext>(rep >> 1);
}
inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
inline void Invalidate() { rep = 0; }

uint8_t rep{0};
} validity_info_;

// Points to a copy of the current compaction iterator output (current_key_)
// if valid_.
// if valid.
Slice key_;
// Points to the value in the underlying iterator that corresponds to the
// current output.
Expand Down

0 comments on commit fee2c47

Please sign in to comment.