Skip to content

Commit

Permalink
Add API to limit blast radius of merge operator failure (#11092)
Browse files Browse the repository at this point in the history
Summary:
Prior to this PR, `FullMergeV2()` can only return `false` to indicate failure, which causes any operation invoking it to fail. During a compaction, such a failure causes the compaction to fail and causes the DB to irreversibly enter read-only mode. Some users asked for a way to allow the merge operator to fail without such widespread damage.

To limit the blast radius of merge operator failures, this PR introduces the `MergeOperationOutput::op_failure_scope` API. When unpopulated (`kDefault`) or set to `kTryMerge`, the merge operator failure handling is the same as before. When set to `kMustMerge`, merge operator failure still causes failure to operations that must merge (`Get()`, iterator, `MultiGet()`, etc.). However, under `kMustMerge`, flushes/compactions can survive merge operator failures by outputting the unmerged input operands.

Pull Request resolved: facebook/rocksdb#11092

Reviewed By: siying

Differential Revision: D42525673

Pulled By: ajkr

fbshipit-source-id: 951dc3bf190f86347dccf3381be967565cda52ee
  • Loading branch information
ajkr authored and facebook-github-bot committed Jan 20, 2023
1 parent bde6505 commit b7fbcef
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 57 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

### Public API Changes
* Substantial changes have been made to the Cache class to support internal development goals. Direct use of Cache class members is discouraged and further breaking modifications are expected in the future. SecondaryCache has some related changes and implementations will need to be updated. (Unlike Cache, SecondaryCache is still intended to support user implementations, and disruptive changes will be avoided.) (#10975)
* Add `MergeOperationOutput::op_failure_scope` for merge operator users to control the blast radius of merge operator failures. Existing merge operator users do not need to make any change to preserve the old behavior

### Performance Improvements
* Updated xxHash source code, which should improve kXXH3 checksum speed, at least on ARM (#11098).
Expand Down
24 changes: 21 additions & 3 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ CompactionIterator::CompactionIterator(
timestamp_size_ == full_history_ts_low_->size());
#endif
input_.SetPinnedItersMgr(&pinned_iters_mgr_);
// The default `merge_until_status_` does not need to be checked since it is
// overwritten as soon as `MergeUntil()` is called
merge_until_status_.PermitUncheckedError();
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
}

Expand Down Expand Up @@ -178,6 +181,20 @@ void CompactionIterator::Next() {
ikey_.user_key = current_key_.GetUserKey();
validity_info_.SetValid(ValidContext::kMerge1);
} else {
if (merge_until_status_.IsMergeInProgress()) {
// `Status::MergeInProgress()` tells us that the previous `MergeUntil()`
// produced only merge operands. Those merge operands were accessed and
// written out using `merge_out_iter_`. Since `merge_out_iter_` is
// exhausted at this point, all merge operands have been written out.
//
// Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that
// needs to be written out. Normally, `CompactionIterator` would skip it
// on the basis that it has already output something in the same
// snapshot stripe. To prevent this, we reset `has_current_user_key_` to
// trick the future iteration from finding out the snapshot stripe is
// unchanged.
has_current_user_key_ = false;
}
// We consumed all pinned merge operands, release pinned iterators
pinned_iters_mgr_.ReleasePinnedData();
// MergeHelper moves the iterator to the first record after the merged
Expand Down Expand Up @@ -880,14 +897,15 @@ void CompactionIterator::NextFromInput() {
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
Status s = merge_helper_->MergeUntil(
merge_until_status_ = merge_helper_->MergeUntil(
&input_, range_del_agg_, prev_snapshot, bottommost_level_,
allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_,
prefetch_buffers_.get(), &iter_stats_);
merge_out_iter_.SeekToFirst();

if (!s.ok() && !s.IsMergeInProgress()) {
status_ = s;
if (!merge_until_status_.ok() &&
!merge_until_status_.IsMergeInProgress()) {
status_ = merge_until_status_;
return;
} else if (merge_out_iter_.Valid()) {
// NOTE: key, value, and ikey_ refer to old entries.
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class CompactionIterator {
bool clear_and_output_next_key_ = false;

MergeOutputIterator merge_out_iter_;
Status merge_until_status_;
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
Expand Down
10 changes: 8 additions & 2 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1247,10 +1247,13 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
}

bool DBIter::Merge(const Slice* val, const Slice& user_key) {
// `op_failure_scope` (an output parameter) is not provided (set to nullptr)
// since a failure must be propagated regardless of its value.
Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_, &pinned_value_,
/* update_num_ops_stats */ true);
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);
if (!s.ok()) {
valid_ = false;
status_ = s;
Expand All @@ -1265,10 +1268,13 @@ bool DBIter::Merge(const Slice* val, const Slice& user_key) {
}

bool DBIter::MergeEntity(const Slice& entity, const Slice& user_key) {
// `op_failure_scope` (an output parameter) is not provided (set to nullptr)
// since a failure must be propagated regardless of its value.
Status s = MergeHelper::TimedFullMergeWithEntity(
merge_operator_, user_key, entity, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, clock_,
/* update_num_ops_stats */ true);
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);
if (!s.ok()) {
valid_ = false;
status_ = s;
Expand Down
155 changes: 155 additions & 0 deletions db/db_merge_operator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,161 @@ TEST_F(DBMergeOperatorTest, MergeErrorOnIteration) {
VerifyDBInternal({{"k1", "v1"}, {"k2", "corrupted"}, {"k2", "v2"}});
}

#ifndef ROCKSDB_LITE

TEST_F(DBMergeOperatorTest, MergeOperatorFailsWithMustMerge) {
// This is like a mini-stress test dedicated to `OpFailureScope::kMustMerge`.
// Some or most of it might be deleted upon adding that option to the actual
// stress test.
//
// "k0" and "k2" are stable (uncorrupted) keys before and after a corrupted
// key ("k1"). The outer loop (`i`) varies which write (`j`) to "k1" triggers
// the corruption. Inside that loop there are three cases:
//
// - Case 1: pure `Merge()`s
// - Case 2: `Merge()`s on top of a `Put()`
// - Case 3: `Merge()`s on top of a `Delete()`
//
// For each case we test query results before flush, after flush, and after
// compaction, as well as cleanup after deletion+compaction. The queries
// expect "k0" and "k2" to always be readable. "k1" is expected to be readable
// only by APIs that do not require merging, such as `GetMergeOperands()`.
const int kNumOperands = 3;
Options options;
options.merge_operator.reset(new TestPutOperator());
options.env = env_;
Reopen(options);

for (int i = 0; i < kNumOperands; ++i) {
auto check_query = [&]() {
{
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "k0", &value));
ASSERT_TRUE(db_->Get(ReadOptions(), "k1", &value).IsCorruption());
ASSERT_OK(db_->Get(ReadOptions(), "k2", &value));
}

{
std::unique_ptr<Iterator> iter;
iter.reset(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("k0", iter->key());
iter->Next();
ASSERT_TRUE(iter->status().IsCorruption());

iter->SeekToLast();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("k2", iter->key());
iter->Prev();
ASSERT_TRUE(iter->status().IsCorruption());

iter->Seek("k2");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("k2", iter->key());
}

std::vector<PinnableSlice> values(kNumOperands);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = kNumOperands;
int num_operands_found = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"k1", values.data(), &merge_operands_info,
&num_operands_found));
ASSERT_EQ(kNumOperands, num_operands_found);
for (int j = 0; j < num_operands_found; ++j) {
if (i == j) {
ASSERT_EQ(values[j], "corrupted_must_merge");
} else {
ASSERT_EQ(values[j], "ok");
}
}
};

ASSERT_OK(Put("k0", "val"));
ASSERT_OK(Put("k2", "val"));

// Case 1
for (int j = 0; j < kNumOperands; ++j) {
if (j == i) {
ASSERT_OK(Merge("k1", "corrupted_must_merge"));
} else {
ASSERT_OK(Merge("k1", "ok"));
}
}
check_query();
ASSERT_OK(Flush());
check_query();
{
CompactRangeOptions cro;
cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
}
check_query();

// Case 2
for (int j = 0; j < kNumOperands; ++j) {
Slice val;
if (j == i) {
val = "corrupted_must_merge";
} else {
val = "ok";
}
if (j == 0) {
ASSERT_OK(Put("k1", val));
} else {
ASSERT_OK(Merge("k1", val));
}
}
check_query();
ASSERT_OK(Flush());
check_query();
{
CompactRangeOptions cro;
cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
}
check_query();

// Case 3
ASSERT_OK(Delete("k1"));
for (int j = 0; j < kNumOperands; ++j) {
if (i == j) {
ASSERT_OK(Merge("k1", "corrupted_must_merge"));
} else {
ASSERT_OK(Merge("k1", "ok"));
}
}
check_query();
ASSERT_OK(Flush());
check_query();
{
CompactRangeOptions cro;
cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
}
check_query();

// Verify obsolete data removal still happens
ASSERT_OK(Delete("k0"));
ASSERT_OK(Delete("k1"));
ASSERT_OK(Delete("k2"));
ASSERT_EQ("NOT_FOUND", Get("k0"));
ASSERT_EQ("NOT_FOUND", Get("k1"));
ASSERT_EQ("NOT_FOUND", Get("k2"));
CompactRangeOptions cro;
cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel());
}
}

#endif // ROCKSDB_LITE

class MergeOperatorPinningTest : public DBMergeOperatorTest,
public testing::WithParamInterface<bool> {
public:
Expand Down
23 changes: 20 additions & 3 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -873,17 +873,34 @@ class FlushCounterListener : public EventListener {
#endif

// A test merge operator mimics put but also fails if one of merge operands is
// "corrupted".
// "corrupted", "corrupted_try_merge", or "corrupted_must_merge".
class TestPutOperator : public MergeOperator {
public:
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
static const std::map<std::string, MergeOperator::OpFailureScope>
bad_operand_to_op_failure_scope = {
{"corrupted", MergeOperator::OpFailureScope::kDefault},
{"corrupted_try_merge", MergeOperator::OpFailureScope::kTryMerge},
{"corrupted_must_merge",
MergeOperator::OpFailureScope::kMustMerge}};
auto check_operand =
[](Slice operand_val,
MergeOperator::OpFailureScope* op_failure_scope) -> bool {
auto iter = bad_operand_to_op_failure_scope.find(operand_val.ToString());
if (iter != bad_operand_to_op_failure_scope.end()) {
*op_failure_scope = iter->second;
return false;
}
return true;
};
if (merge_in.existing_value != nullptr &&
*(merge_in.existing_value) == "corrupted") {
!check_operand(*merge_in.existing_value,
&merge_out->op_failure_scope)) {
return false;
}
for (auto value : merge_in.operand_list) {
if (value == "corrupted") {
if (!check_operand(value, &merge_out->op_failure_scope)) {
return false;
}
}
Expand Down
30 changes: 25 additions & 5 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1067,11 +1067,15 @@ static bool SaveValue(void* arg, const char* entry) {

if (s->value || s->columns) {
std::string result;
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);

if (s->status->ok()) {
if (s->value) {
Expand Down Expand Up @@ -1130,18 +1134,26 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
v, value_of_default);
if (s->status->ok()) {
// `op_failure_scope` (an output parameter) is not provided (set
// to nullptr) since a failure must be propagated regardless of
// its value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &value_of_default,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);
}
} else if (s->columns) {
std::string result;
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*(s->status) = MergeHelper::TimedFullMergeWithEntity(
merge_operator, s->key->user_key(), v,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* update_num_ops_stats */ true);
s->clock, /* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);

if (s->status->ok()) {
*(s->status) = s->columns->SetWideColumnValue(result);
Expand Down Expand Up @@ -1177,11 +1189,15 @@ static bool SaveValue(void* arg, const char* entry) {
if (*(s->merge_in_progress)) {
if (s->value || s->columns) {
std::string result;
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);

if (s->status->ok()) {
if (s->value) {
Expand Down Expand Up @@ -1217,11 +1233,15 @@ static bool SaveValue(void* arg, const char* entry) {
merge_context->GetOperandsDirectionBackward())) {
if (s->value || s->columns) {
std::string result;
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), nullptr,
merge_context->GetOperands(), &result, s->logger, s->statistics,
s->clock, /* result_operand */ nullptr,
/* update_num_ops_stats */ true);
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr);

if (s->status->ok()) {
if (s->value) {
Expand Down
Loading

0 comments on commit b7fbcef

Please sign in to comment.