Skip to content

Commit

Permalink
Introduce FullMergeV2 (eliminate memcpy from merge operators)
Browse files Browse the repository at this point in the history
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>

This diff is stacked on top of D56493 and D56511

In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand

```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000

[FullMergeV2]
readseq      :       0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq      :       0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq      :       0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq      :       0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq      :       0.247 micros/op 4043927 ops/sec; 39553.2 MB/s

[master]
readseq      :       3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq      :       3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq      :       3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq      :       3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq      :       4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```

```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000

[FullMergeV2]
readseq      :       3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq      :       2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq      :       1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq      :       1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq      :       1.250 micros/op 800000 ops/sec; 7824.7 MB/s

[master]
readseq      :      24.025 micros/op 41623 ops/sec;  407.1 MB/s
readseq      :      18.489 micros/op 54086 ops/sec;  529.0 MB/s
readseq      :      18.693 micros/op 53495 ops/sec;  523.2 MB/s
readseq      :      23.621 micros/op 42335 ops/sec;  414.1 MB/s
readseq      :      18.775 micros/op 53262 ops/sec;  521.0 MB/s

```

```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]

[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq      :      14.741 micros/op 67837 ops/sec;  663.5 MB/s
readseq      :       1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq      :       0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq      :       0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq      :       0.943 micros/op 1060657 ops/sec; 10374.2 MB/s

[master]
readseq      :      16.735 micros/op 59755 ops/sec;  584.5 MB/s
readseq      :       3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq      :       3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq      :       3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq      :       3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```

```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]

DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions

[FullMergeV2]
readseq      :      24.325 micros/op 41109 ops/sec;  402.1 MB/s
readseq      :       1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq      :       1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq      :       1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq      :       1.109 micros/op 901713 ops/sec; 8819.6 MB/s

[master]
readseq      :      27.257 micros/op 36687 ops/sec;  358.8 MB/s
readseq      :       4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq      :       5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq      :       4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq      :       4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```

Test Plan: COMPILE_WITH_ASAN=1 make check -j64

Reviewers: yhchiang, andrewkr, sdong

Reviewed By: sdong

Subscribers: lovro, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D57075
  • Loading branch information
IslamAbdelRahman committed Jul 20, 2016
1 parent e70ba4e commit 68a8e6b
Show file tree
Hide file tree
Showing 38 changed files with 814 additions and 237 deletions.
24 changes: 11 additions & 13 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,33 +269,31 @@ struct rocksdb_mergeoperator_t : public MergeOperator {

virtual const char* Name() const override { return (*name_)(state_); }

virtual bool FullMerge(const Slice& key, const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const override {
size_t n = operand_list.size();
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
size_t n = merge_in.operand_list.size();
std::vector<const char*> operand_pointers(n);
std::vector<size_t> operand_sizes(n);
for (size_t i = 0; i < n; i++) {
Slice operand(operand_list[i]);
Slice operand(merge_in.operand_list[i]);
operand_pointers[i] = operand.data();
operand_sizes[i] = operand.size();
}

const char* existing_value_data = nullptr;
size_t existing_value_len = 0;
if (existing_value != nullptr) {
existing_value_data = existing_value->data();
existing_value_len = existing_value->size();
if (merge_in.existing_value != nullptr) {
existing_value_data = merge_in.existing_value->data();
existing_value_len = merge_in.existing_value->size();
}

unsigned char success;
size_t new_value_len;
char* tmp_new_value = (*full_merge_)(
state_, key.data(), key.size(), existing_value_data, existing_value_len,
&operand_pointers[0], &operand_sizes[0], static_cast<int>(n), &success,
&new_value_len);
new_value->assign(tmp_new_value, new_value_len);
state_, merge_in.key.data(), merge_in.key.size(), existing_value_data,
existing_value_len, &operand_pointers[0], &operand_sizes[0],
static_cast<int>(n), &success, &new_value_len);
merge_out->new_value.assign(tmp_new_value, new_value_len);

if (delete_value_ != nullptr) {
(*delete_value_)(state_, tmp_new_value, new_value_len);
Expand Down
10 changes: 10 additions & 0 deletions db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ CompactionIterator::CompactionIterator(
} else {
ignore_snapshots_ = false;
}
input_->SetPinnedItersMgr(&pinned_iters_mgr_);
}

CompactionIterator::~CompactionIterator() {
// input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
input_->SetPinnedItersMgr(nullptr);
}

void CompactionIterator::ResetRecordCounts() {
Expand Down Expand Up @@ -83,6 +89,8 @@ void CompactionIterator::Next() {
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
} else {
// We consumed all pinned merge operands, release pinned iterators
pinned_iters_mgr_.ReleasePinnedIterators();
// MergeHelper moves the iterator to the first record after the merged
// records, so even though we reached the end of the merge output, we do
// not want to advance the iterator.
Expand Down Expand Up @@ -368,6 +376,7 @@ void CompactionIterator::NextFromInput() {
return;
}

pinned_iters_mgr_.StartPinning();
// We know the merge type entry is not hidden, otherwise we would
// have hit (A)
// We encapsulate the merge related state machine in a different
Expand Down Expand Up @@ -395,6 +404,7 @@ void CompactionIterator::NextFromInput() {
// batch consumed by the merge operator should not shadow any keys
// coming after the merges
has_current_user_key_ = false;
pinned_iters_mgr_.ReleasePinnedIterators();
}
} else {
valid_ = true;
Expand Down
6 changes: 6 additions & 0 deletions db/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "db/compaction.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "rocksdb/compaction_filter.h"
#include "util/log_buffer.h"

Expand Down Expand Up @@ -46,6 +47,8 @@ class CompactionIterator {
const CompactionFilter* compaction_filter = nullptr,
LogBuffer* log_buffer = nullptr);

~CompactionIterator();

void ResetRecordCounts();

// Seek to the beginning of the compaction iterator output.
Expand Down Expand Up @@ -136,6 +139,9 @@ class CompactionIterator {
bool clear_and_output_next_key_ = false;

MergeOutputIterator merge_out_iter_;
// PinnedIteratorsManager used to pin input_ Iterator blocks while reading
// merge operands and then releasing them after consuming them.
PinnedIteratorsManager pinned_iters_mgr_;
std::string compaction_filter_value_;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
Expand Down
50 changes: 31 additions & 19 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ class DBIter: public Iterator {
virtual Slice value() const override {
assert(valid_);
if (current_entry_is_merged_) {
return saved_value_;
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
return pinned_value_.data() ? pinned_value_ : saved_value_;
} else if (direction_ == kReverse) {
return pinned_value_;
} else {
Expand Down Expand Up @@ -286,9 +288,9 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
void DBIter::Next() {
assert(valid_);

// Release temporarily pinned blocks from last operation
ReleaseTempPinnedData();
if (direction_ == kReverse) {
// We only pin blocks when doing kReverse
ReleaseTempPinnedData();
FindNextUserKey();
direction_ = kForward;
if (!iter_->Valid()) {
Expand Down Expand Up @@ -433,9 +435,12 @@ void DBIter::MergeValuesNewToOld() {
return;
}

// Temporarily pin the blocks that hold merge operands
TempPinData();
merge_context_.Clear();
// Start the merge process by pushing the first operand
merge_context_.PushOperand(iter_->value());
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);

ParsedInternalKey ikey;
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
Expand All @@ -459,15 +464,15 @@ void DBIter::MergeValuesNewToOld() {
const Slice val = iter_->value();
MergeHelper::TimedFullMerge(merge_operator_, ikey.user_key, &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
logger_, statistics_, env_, &pinned_value_);
// iter_ is positioned after put
iter_->Next();
return;
} else if (kTypeMerge == ikey.type) {
// hit a merge, add the value as an operand and run associative merge.
// when complete, add result to operands and continue.
const Slice& val = iter_->value();
merge_context_.PushOperand(val);
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
} else {
assert(false);
}
Expand All @@ -479,15 +484,15 @@ void DBIter::MergeValuesNewToOld() {
// client can differentiate this scenario and do things accordingly.
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
logger_, statistics_, env_, &pinned_value_);
}

void DBIter::Prev() {
assert(valid_);
ReleaseTempPinnedData();
if (direction_ == kForward) {
ReverseToBackward();
}
ReleaseTempPinnedData();
PrevInternal();
if (statistics_ != nullptr) {
local_stats_.prev_count_++;
Expand Down Expand Up @@ -580,6 +585,9 @@ bool DBIter::FindValueForCurrentKey() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kReverse);

// Temporarily pin blocks that hold (merge operands / the value)
ReleaseTempPinnedData();
TempPinData();
size_t num_skipped = 0;
while (iter_->Valid() && ikey.sequence <= sequence_ &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
Expand All @@ -592,8 +600,7 @@ bool DBIter::FindValueForCurrentKey() {
switch (last_key_entry_type) {
case kTypeValue:
merge_context_.Clear();
ReleaseTempPinnedData();
TempPinData();
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
last_not_merge_type = kTypeValue;
break;
Expand All @@ -605,7 +612,8 @@ bool DBIter::FindValueForCurrentKey() {
break;
case kTypeMerge:
assert(merge_operator_ != nullptr);
merge_context_.PushOperandBack(iter_->value());
merge_context_.PushOperandBack(
iter_->value(), iter_->IsValuePinned() /* operand_pinned */);
break;
default:
assert(false);
Expand All @@ -628,13 +636,14 @@ bool DBIter::FindValueForCurrentKey() {
if (last_not_merge_type == kTypeDeletion) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_);
&saved_value_, logger_, statistics_, env_,
&pinned_value_);
} else {
assert(last_not_merge_type == kTypeValue);
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(),
&pinned_value_,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
logger_, statistics_, env_, &pinned_value_);
}
break;
case kTypeValue:
Expand All @@ -651,6 +660,9 @@ bool DBIter::FindValueForCurrentKey() {
// This function is used in FindValueForCurrentKey.
// We use Seek() function instead of Prev() to find necessary value
bool DBIter::FindValueForCurrentKeyUsingSeek() {
// FindValueForCurrentKey will enable pinning before calling
// FindValueForCurrentKeyUsingSeek()
assert(pinned_iters_mgr_.PinningEnabled());
std::string last_key;
AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetKey(), sequence_,
kValueTypeForSeek));
Expand All @@ -664,8 +676,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
ikey.type == kTypeSingleDeletion) {
if (ikey.type == kTypeValue) {
ReleaseTempPinnedData();
TempPinData();
assert(iter_->IsValuePinned());
pinned_value_ = iter_->value();
valid_ = true;
return true;
Expand All @@ -681,7 +692,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
while (iter_->Valid() &&
user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) &&
ikey.type == kTypeMerge) {
merge_context_.PushOperand(iter_->value());
merge_context_.PushOperand(iter_->value(),
iter_->IsValuePinned() /* operand_pinned */);
iter_->Next();
FindParseableKey(&ikey, kForward);
}
Expand All @@ -691,7 +703,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), nullptr,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
logger_, statistics_, env_, &pinned_value_);
// Make iter_ valid and point to saved_key_
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
Expand All @@ -705,7 +717,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
const Slice& val = iter_->value();
MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetKey(), &val,
merge_context_.GetOperands(), &saved_value_,
logger_, statistics_, env_);
logger_, statistics_, env_, &pinned_value_);
valid_ = true;
return true;
}
Expand Down
3 changes: 3 additions & 0 deletions db/db_iter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ class TestIterator : public InternalIterator {
return Status::OK();
}

virtual bool IsKeyPinned() const override { return true; }
virtual bool IsValuePinned() const override { return true; }

private:
bool initialized_;
bool valid_;
Expand Down
9 changes: 4 additions & 5 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4848,12 +4848,11 @@ class DelayedMergeOperator : public MergeOperator {

public:
explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {}
virtual bool FullMerge(const Slice& key, const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const override {

virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
db_test_->env_->addon_time_.fetch_add(1000);
*new_value = "";
merge_out->new_value = "";
return true;
}

Expand Down
Loading

0 comments on commit 68a8e6b

Please sign in to comment.