Skip to content

Commit

Permalink
Compaction filter on merge operands
Browse files Browse the repository at this point in the history
Summary:
Since Andres' internship is over, I took over https://reviews.facebook.net/D42555 and rebased and simplified it a bit.

The behavior in this diff is a bit simpler than in D42555:
* only merge operators are passed through FilterMergeValue(). If fitler function returns true, the merge operator is ignored
* compaction filter is *not* called on: 1) results of merge operations and 2) base values that are getting merged with merge operands (the second case was also true in previous diff)

Do we also need a compaction filter to get called on merge results?

Test Plan: make && make check

Reviewers: lovro, tnovak, rven, yhchiang, sdong

Reviewed By: sdong

Subscribers: noetzli, kolmike, leveldb, dhruba, sdong

Differential Revision: https://reviews.facebook.net/D47847
  • Loading branch information
igorcanadi committed Oct 7, 2015
1 parent 726d9ce commit d80ce7f
Show file tree
Hide file tree
Showing 17 changed files with 606 additions and 151 deletions.
4 changes: 3 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
* Added single delete operation as a more efficient way to delete keys that have not been overwritten.
* Added experimental AddFile() to DB interface that allow users to add files created by SstFileWriter into an empty Database, see include/rocksdb/sst_file_writer.h and DB::AddFile() for more info.
* Added support for opening SST files with .ldb suffix which enables opening LevelDB databases.
* CompactionFilter now supports filtering of merge operands and merge results.

### Public API Changes
* Added SingleDelete() to the DB interface.
* Added AddFile() to DB interface.
* Added SstFileWriter class.
* CompactionFilter has a new method FilterMergeOperand() that RocksDB applies to every merge operand during compaction to decide whether to filter the operand.

## 4.0.0 (9/9/2015)
### New Features
* Added support for transactions. See include/rocksdb/utilities/transaction.h for more info.
* DB::GetProperty() now accept "rocksdb.aggregated-table-properties" and "rocksdb.aggregated-table-properties-at-levelN", in which case it returns aggregated table properties of the target column family, or the aggregated table properties of the specified level N if the "at-level" version is used.
* DB::GetProperty() now accepts "rocksdb.aggregated-table-properties" and "rocksdb.aggregated-table-properties-at-levelN", in which case it returns aggregated table properties of the target column family, or the aggregated table properties of the specified level N if the "at-level" version is used.
* Add compression option kZSTDNotFinalCompression for people to experiment ZSTD although its format is not finalized.
* We removed the need for LATEST_BACKUP file in BackupEngine. We still keep writing it when we create new backups (because of backward compatibility), but we don't read it anymore.

Expand Down
7 changes: 4 additions & 3 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ Status BuildTable(
file_writer.get(), compression, compression_opts);
}

MergeHelper merge(internal_comparator.user_comparator(),
ioptions.merge_operator, ioptions.info_log,
MergeHelper merge(env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
ioptions.min_partial_merge_operands,
true /* internal key corruption is not ok */);
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back());

CompactionIterator c_iter(iter, internal_comparator.user_comparator(),
&merge, kMaxSequenceNumber, &snapshots, env,
Expand Down
44 changes: 24 additions & 20 deletions db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ namespace rocksdb {
CompactionIterator::CompactionIterator(
Iterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
Env* env, bool expect_valid_internal_key, Statistics* stats,
Compaction* compaction, const CompactionFilter* compaction_filter,
LogBuffer* log_buffer)
Env* env, bool expect_valid_internal_key, Compaction* compaction,
const CompactionFilter* compaction_filter, LogBuffer* log_buffer)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
snapshots_(snapshots),
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
stats_(stats),
compaction_(compaction),
compaction_filter_(compaction_filter),
log_buffer_(log_buffer),
Expand Down Expand Up @@ -277,24 +275,30 @@ void CompactionIterator::NextFromInput() {
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_,
stats_, env_);
merge_helper_->MergeUntil(input_, prev_snapshot, bottommost_level_);
merge_out_iter_.SeekToFirst();

// NOTE: key, value, and ikey_ refer to old entries.
// These will be correctly set below.
key_ = merge_out_iter_.key();
value_ = merge_out_iter_.value();
bool valid_key __attribute__((__unused__)) =
ParseInternalKey(key_, &ikey_);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
if (merge_out_iter_.Valid()) {
// NOTE: key, value, and ikey_ refer to old entries.
// These will be correctly set below.
key_ = merge_out_iter_.key();
value_ = merge_out_iter_.value();
bool valid_key __attribute__((__unused__)) =
ParseInternalKey(key_, &ikey_);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
} else {
// all merge operands were filtered out. reset the user key, since the
// batch consumed by the merge operator should not shadow any keys
// coming after the merges
has_current_user_key_ = false;
}
} else {
valid_ = true;
}
Expand Down
2 changes: 0 additions & 2 deletions db/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class CompactionIterator {
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots, Env* env,
bool expect_valid_internal_key,
Statistics* stats = nullptr,
Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);
Expand Down Expand Up @@ -91,7 +90,6 @@ class CompactionIterator {
const std::vector<SequenceNumber>* snapshots_;
Env* env_;
bool expect_valid_internal_key_;
Statistics* stats_;
Compaction* compaction_;
const CompactionFilter* compaction_filter_;
LogBuffer* log_buffer_;
Expand Down
3 changes: 2 additions & 1 deletion db/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class CompactionIteratorTest : public testing::Test {
void InitIterator(const std::vector<std::string>& ks,
const std::vector<std::string>& vs,
SequenceNumber last_sequence) {
merge_helper_.reset(new MergeHelper(cmp_, nullptr, nullptr, 0U, false));
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, nullptr, nullptr,
nullptr, 0U, false, 0));
iter_.reset(new test::VectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(iter_.get(), cmp_, merge_helper_.get(),
Expand Down
15 changes: 9 additions & 6 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,17 +597,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator,
db_options_.info_log.get(),
cfd->ioptions()->min_partial_merge_operands,
false /* internal key corruption is expected */);
auto compaction_filter = cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
compaction_filter_from_factory =
sub_compact->compaction->CreateCompactionFilter();
compaction_filter = compaction_filter_from_factory.get();
}
MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
compaction_filter, db_options_.info_log.get(),
cfd->ioptions()->min_partial_merge_operands,
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
compact_->compaction->level(), db_options_.statistics.get());

TEST_SYNC_POINT("CompactionJob::Run():Inprogress");

Expand All @@ -624,8 +627,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, env_, false, db_options_.statistics.get(),
sub_compact->compaction, compaction_filter));
&existing_snapshots_, env_, false, sub_compact->compaction,
compaction_filter));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
const auto& c_iter_stats = c_iter->iter_stats();
Expand Down
138 changes: 122 additions & 16 deletions db/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class CompactionJobTest : public testing::Test {
return expected_results;
}

void NewDB(std::shared_ptr<MergeOperator> merge_operator = nullptr) {
void NewDB() {
VersionEdit new_db;
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
Expand All @@ -207,7 +207,8 @@ class CompactionJobTest : public testing::Test {

std::vector<ColumnFamilyDescriptor> column_families;
cf_options_.table_factory = mock_table_factory_;
cf_options_.merge_operator = merge_operator;
cf_options_.merge_operator = merge_op_;
cf_options_.compaction_filter = compaction_filter_.get();
column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);

EXPECT_OK(versions_->Recover(column_families, false));
Expand Down Expand Up @@ -258,10 +259,16 @@ class CompactionJobTest : public testing::Test {
&mutex_));
mutex_.Unlock();

ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
mock_table_factory_->AssertLatestFile(expected_results);
if (expected_results.size() == 0) {
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
} else {
ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
ASSERT_EQ(compaction_job_stats_.num_output_files, 1U);
mock_table_factory_->AssertLatestFile(expected_results);
}
}

Env* env_;
Expand All @@ -279,6 +286,8 @@ class CompactionJobTest : public testing::Test {
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
CompactionJobStats compaction_job_stats_;
ColumnFamilyData* cfd_;
std::unique_ptr<CompactionFilter> compaction_filter_;
std::shared_ptr<MergeOperator> merge_op_;
};

TEST_F(CompactionJobTest, Simple) {
Expand All @@ -297,7 +306,7 @@ TEST_F(CompactionJobTest, SimpleCorrupted) {
auto expected_results = CreateTwoFiles(true);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
RunCompaction({ files }, expected_results);
RunCompaction({files}, expected_results);
ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
}

Expand All @@ -317,7 +326,7 @@ TEST_F(CompactionJobTest, SimpleDeletion) {

SetLastSequence(4U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({ files }, expected_results);
RunCompaction({files}, expected_results);
}

TEST_F(CompactionJobTest, SimpleOverwrite) {
Expand All @@ -339,7 +348,7 @@ TEST_F(CompactionJobTest, SimpleOverwrite) {

SetLastSequence(4U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({ files }, expected_results);
RunCompaction({files}, expected_results);
}

TEST_F(CompactionJobTest, SimpleNonLastLevel) {
Expand Down Expand Up @@ -368,12 +377,12 @@ TEST_F(CompactionJobTest, SimpleNonLastLevel) {
SetLastSequence(6U);
auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
RunCompaction({ lvl0_files, lvl1_files }, expected_results);
RunCompaction({lvl0_files, lvl1_files}, expected_results);
}

TEST_F(CompactionJobTest, SimpleMerge) {
auto merge_op = MergeOperators::CreateStringAppendOperator();
NewDB(merge_op);
merge_op_ = MergeOperators::CreateStringAppendOperator();
NewDB();

auto file1 = mock::MakeMockFile({
{KeyStr("a", 5U, kTypeMerge), "5"},
Expand All @@ -392,12 +401,12 @@ TEST_F(CompactionJobTest, SimpleMerge) {

SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({ files }, expected_results);
RunCompaction({files}, expected_results);
}

TEST_F(CompactionJobTest, NonAssocMerge) {
auto merge_op = MergeOperators::CreateStringAppendTESTOperator();
NewDB(merge_op);
merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
NewDB();

auto file1 = mock::MakeMockFile({
{KeyStr("a", 5U, kTypeMerge), "5"},
Expand All @@ -417,7 +426,104 @@ TEST_F(CompactionJobTest, NonAssocMerge) {

SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({ files }, expected_results);
RunCompaction({files}, expected_results);
}

// Filters merge operands with value 10.
TEST_F(CompactionJobTest, MergeOperandFilter) {
merge_op_ = MergeOperators::CreateUInt64AddOperator();
compaction_filter_.reset(new test::FilterNumber(10U));
NewDB();

auto file1 = mock::MakeMockFile(
{{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
{KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered
{KeyStr("a", 3U, kTypeMerge), test::EncodeInt(3U)}});
AddMockFile(file1);

auto file2 = mock::MakeMockFile({
{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)},
{KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)} // Filtered
});
AddMockFile(file2);

auto expected_results =
mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), test::EncodeInt(8U)},
{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)}});

SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}

TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
merge_op_ = MergeOperators::CreateUInt64AddOperator();
compaction_filter_.reset(new test::FilterNumber(10U));
NewDB();

auto file1 = mock::MakeMockFile(
{{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
{KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered
{KeyStr("a", 3U, kTypeValue), test::EncodeInt(5U)},
{KeyStr("d", 8U, kTypeMerge), test::EncodeInt(10U)}});
AddMockFile(file1);

auto file2 =
mock::MakeMockFile({{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("c", 2U, kTypeMerge), test::EncodeInt(3U)},
{KeyStr("c", 1U, kTypeValue), test::EncodeInt(7U)},
{KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}});
AddMockFile(file2);

auto file3 =
mock::MakeMockFile({{KeyStr("a", 1U, kTypeMerge), test::EncodeInt(3U)}});
AddMockFile(file3, 2);

auto expected_results = mock::MakeMockFile({
{KeyStr("a", 5U, kTypeValue), test::EncodeInt(10U)},
{KeyStr("c", 2U, kTypeValue), test::EncodeInt(10U)},
{KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}
// b does not appear because the operands are filtered
});

SetLastSequence(5U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}

// Test where all operands/merge results are filtered out.
TEST_F(CompactionJobTest, FilterAllMergeOperands) {
merge_op_ = MergeOperators::CreateUInt64AddOperator();
compaction_filter_.reset(new test::FilterNumber(10U));
NewDB();

auto file1 =
mock::MakeMockFile({{KeyStr("a", 11U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("a", 10U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("a", 9U, kTypeMerge), test::EncodeInt(10U)}});
AddMockFile(file1);

auto file2 =
mock::MakeMockFile({{KeyStr("b", 8U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 7U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 6U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 5U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 4U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 3U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("c", 2U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("c", 1U, kTypeMerge), test::EncodeInt(10U)}});
AddMockFile(file2);

auto file3 =
mock::MakeMockFile({{KeyStr("a", 2U, kTypeMerge), test::EncodeInt(10U)},
{KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}});
AddMockFile(file3, 2);

SetLastSequence(11U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, {});
}

TEST_F(CompactionJobTest, SimpleSingleDelete) {
Expand Down
Loading

0 comments on commit d80ce7f

Please sign in to comment.