Skip to content

Commit

Permalink
[performance](load) improve memtable sort performance (apache#20392)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Jun 4, 2023
1 parent 34a1b75 commit b0bbff0
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 18 deletions.
48 changes: 40 additions & 8 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <pdqsort.h>

#include <algorithm>
#include <cstddef>
Expand Down Expand Up @@ -249,13 +250,32 @@ void MemTable::_put_into_output(vectorized::Block& in_block) {
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
row_pos_vec.data() + in_block.rows());
}
int MemTable::_sort() {

size_t MemTable::_sort() {
SCOPED_RAW_TIMER(&_stat.sort_ns);
_stat.sort_times++;
_vec_row_comparator->set_block(&_input_mutable_block);
auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
size_t same_keys_num = 0;
// sort new rows
Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
for (size_t i = 0; i < _schema->num_key_columns(); i++) {
auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1);
};
_sort_one_column(_row_in_blocks, tie, cmp);
}
bool is_dup = (_keys_type == KeysType::DUP_KEYS);
// sort extra round by _row_pos to make the sort stable
auto iter = tie.iter();
while (iter.next()) {
pdqsort(std::next(_row_in_blocks.begin(), iter.left()),
std::next(_row_in_blocks.begin(), iter.right()),
[&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool {
return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos;
});
same_keys_num += iter.right() - iter.left();
}
// merge new rows and old rows
_vec_row_comparator->set_block(&_input_mutable_block);
auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l,
const RowInBlock* r) -> bool {
auto value = (*(this->_vec_row_comparator))(l, r);
Expand All @@ -266,14 +286,26 @@ int MemTable::_sort() {
return value < 0;
}
};
// sort new rows
std::sort(new_row_it, _row_in_blocks.end(), cmp_func);
// merge new rows and old rows
auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func);
_last_sorted_pos = _row_in_blocks.size();
return same_keys_num;
}

void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp) {
auto iter = tie.iter();
while (iter.next()) {
pdqsort(std::next(row_in_blocks.begin(), iter.left()),
std::next(row_in_blocks.begin(), iter.right()),
[&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; });
tie[iter.left()] = 0;
for (int i = iter.left() + 1; i < iter.right(); i++) {
tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0);
}
}
}

template <bool is_final>
void MemTable::_finalize_one_row(RowInBlock* row,
const vectorized::ColumnsWithTypeAndName& block_data,
Expand Down Expand Up @@ -379,7 +411,7 @@ void MemTable::shrink_memtable_by_agg() {
if (_keys_type == KeysType::DUP_KEYS) {
return;
}
int same_keys_num = _sort();
size_t same_keys_num = _sort();
if (same_keys_num == 0) {
vectorized::Block in_block = _input_mutable_block.to_block();
_put_into_output(in_block);
Expand Down Expand Up @@ -465,7 +497,7 @@ Status MemTable::flush() {

Status MemTable::_do_flush() {
SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker);
int same_keys_num = _sort();
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _schema->num_key_columns() == 0) {
_output_mutable_block.swap(_input_mutable_block);
Expand Down
63 changes: 62 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stddef.h>
#include <stdint.h>

#include <cstring>
#include <functional>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -69,6 +70,64 @@ struct RowInBlock {
inline void remove_init_agg() { _has_init_agg = false; }
};

class Tie {
public:
class Iter {
public:
Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {}
size_t left() { return _left; }
size_t right() { return _right; }

// return false means no more ranges
bool next() {
if (_next >= _tie._end) {
return false;
}
_next = _find(1, _next);
if (_next >= _tie._end) {
return false;
}
_left = _next - 1;
_next = _find(0, _next);
_right = _next;
return true;
}

private:
size_t _find(uint8_t value, size_t start) {
if (start >= _tie._end) {
return start;
}
size_t offset = start - _tie._begin;
size_t size = _tie._end - start;
void* p = std::memchr(_tie._bits.data() + offset, value, size);
if (p == nullptr) {
return _tie._end;
}
return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin;
}

private:
Tie& _tie;
size_t _left;
size_t _right;
size_t _next;
};

public:
Tie(size_t begin, size_t end) : _begin(begin), _end(end) {
_bits = std::vector<uint8_t>(_end - _begin, 1);
}
uint8_t operator[](int i) const { return _bits[i - _begin]; }
uint8_t& operator[](int i) { return _bits[i - _begin]; }
Iter iter() { return Iter(*this); }

private:
const size_t _begin;
const size_t _end;
std::vector<uint8_t> _bits;
};

class RowInBlockComparator {
public:
RowInBlockComparator(const Schema* schema) : _schema(schema) {}
Expand Down Expand Up @@ -220,7 +279,9 @@ class MemTable {
size_t _last_sorted_pos = 0;

//return number of same keys
int _sort();
size_t _sort();
void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
std::function<int(const RowInBlock*, const RowInBlock*)> cmp);
template <bool is_final>
void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data,
int row_pos);
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,14 @@ class MutableBlock {
return _data_types[position];
}

int compare_one_column(size_t n, size_t m, size_t column_id, int nan_direction_hint) const {
DCHECK_LE(column_id, columns());
DCHECK_LE(n, rows());
DCHECK_LE(m, rows());
auto& column = get_column_by_position(column_id);
return column->compare_at(n, m, *column, nan_direction_hint);
}

int compare_at(size_t n, size_t m, size_t num_columns, const MutableBlock& rhs,
int nan_direction_hint) const {
DCHECK_GE(columns(), num_columns);
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(OLAP_TEST_FILES
olap/cumulative_compaction_policy_test.cpp
#olap/row_cursor_test.cpp
olap/skiplist_test.cpp
olap/memtable_sort_test.cpp
olap/olap_meta_test.cpp
olap/decimal12_test.cpp
olap/storage_types_test.cpp
Expand Down
83 changes: 83 additions & 0 deletions be/test/olap/memtable_sort_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "olap/memtable.h"

namespace doris {

class MemTableSortTest : public ::testing::Test {};

TEST_F(MemTableSortTest, Tie) {
auto t0 = Tie {0, 0};
EXPECT_FALSE(t0.iter().next());

auto tie = Tie {0, 1};
EXPECT_FALSE(tie.iter().next());

auto t = Tie {10, 30};
for (int i = 10; i < 30; i++) {
EXPECT_EQ(t[i], 1);
}

auto it1 = t.iter();
EXPECT_TRUE(it1.next());
EXPECT_EQ(it1.left(), 10);
EXPECT_EQ(it1.right(), 30);

EXPECT_FALSE(it1.next());

t[13] = t[14] = t[22] = t[29] = 0;
auto it2 = t.iter();

EXPECT_TRUE(it2.next());
EXPECT_EQ(it2.left(), 10);
EXPECT_EQ(it2.right(), 13);

EXPECT_TRUE(it2.next());
EXPECT_EQ(it2.left(), 14);
EXPECT_EQ(it2.right(), 22);

EXPECT_TRUE(it2.next());
EXPECT_EQ(it2.left(), 22);
EXPECT_EQ(it2.right(), 29);

EXPECT_FALSE(it2.next());
EXPECT_FALSE(it2.next());

// 100000000...
for (int i = 11; i < 30; i++) {
t[i] = 0;
}
EXPECT_FALSE(t.iter().next());

// 000000000...
t[10] = 0;
EXPECT_FALSE(t.iter().next());

// 000000000...001
t[29] = 1;
auto it3 = t.iter();
EXPECT_TRUE(it3.next());
EXPECT_EQ(it3.left(), 28);
EXPECT_EQ(it3.right(), 30);

EXPECT_FALSE(it3.next());
}

} // namespace doris
10 changes: 5 additions & 5 deletions thirdparty/build-thirdparty.sh
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ check_if_source_exist() {
echo "===== begin build $1"
}

check_if_archieve_exist() {
check_if_archive_exist() {
if [[ -z $1 ]]; then
echo "archieve should specified to check if exist."
echo "archive should specified to check if exist."
exit 1
fi

Expand Down Expand Up @@ -1167,9 +1167,9 @@ build_parallel_hashmap() {

# pdqsort
build_pdqsort() {
check_if_source_exist "${PDQSORT_SOURCE}"
cd "${TP_SOURCE_DIR}/${PDQSORT_SOURCE}"
cp -r pdqsort.h "${TP_INSTALL_DIR}/include/"
check_if_archive_exist "${PDQSORT_FILE}"
cd "${TP_SOURCE_DIR}"
cp "${PDQSORT_FILE}" "${TP_INSTALL_DIR}/include/"
}

# libdivide
Expand Down
8 changes: 4 additions & 4 deletions thirdparty/vars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,10 @@ LIBDIVIDE_SOURCE="libdivide-5.0"
LIBDIVIDE_MD5SUM="7fd16b0bb4ab6812b2e2fdc7bfb81641"

#pdqsort
PDQSORT_DOWNLOAD="http://ftp.cise.ufl.edu/ubuntu/pool/universe/p/pdqsort/pdqsort_0.0.0+git20180419.orig.tar.gz"
PDQSORT_NAME="pdqsort.tar.gz"
PDQSORT_SOURCE="pdqsort-0.0.0+git20180419"
PDQSORT_MD5SUM="39261c3e7b40aa7505662fac29f22d20"
PDQSORT_DOWNLOAD="https://raw.githubusercontent.com/orlp/pdqsort/b1ef26a55cdb60d236a5cb199c4234c704f46726/pdqsort.h"
PDQSORT_NAME="pdqsort.h"
PDQSORT_FILE="pdqsort.h"
PDQSORT_MD5SUM="af28f79d5d7d7a5486f54d9f1244c2b5"

# benchmark
BENCHMARK_DOWNLOAD="https://github.com/google/benchmark/archive/v1.5.6.tar.gz"
Expand Down

0 comments on commit b0bbff0

Please sign in to comment.