Skip to content

Commit

Permalink
Improve SkipList memory usage tracking (apache#2359)
Browse files Browse the repository at this point in the history
The problem with the current implementation is that all data to be
inserted will be counted in memory, but for the aggregation model or
some other special cases, not all data will be inserted into `MemTable`,
and these data should not be counted in memory.

This change makes the `SkipList` use the exclusive `MemPool`,
and only the data will be inserted into the `SkipList` can use this
`MemPool`. In other words, those discarded rows will not be
counted by the `MemPool` of` SkipList`.

In order to avoid duplicate checking whether a row already exists in
`SkipList`, this change also modifies the `SkipList` interface(A `Hint`
will be fetched when `Find()`, and then use it in `InsertUseHint()`),
and made `SkipList` no longer aware of the aggregation logic.

At present, because of the data row(`Tuple`) generated by the upper layer
is different from the data row(`Row`) internally represented by the
engine, when inserting `MemTable`, the data row must be copied.
If the row needs to be inserted into SkipList, we need copy it again
to `MemPool` of `SkipList`.

And, at present, the aggregation function only supports `MemPool` when
copying, so even if the data will not be inserted into` SkipList`,
`MemPool` is still used (in the future, it can be replaced with an
ordinary` Buffer`). However, we reuse the allocated memory in MemPool,
that is, we do not reallocate new memory every time.

Note: Due to the characteristics of `MemPool` (once inserted, it cannot
be partially cleared), the following scenarios may still cause multiple
flushes. For example, the aggregation model of a string column is `MAX`,
and the data inserted at the same time is in ascending order, then for
each data row, it must apply for memory from `MemPool` in `SkipList`,
that is, although the old rows in SkipList` will be discarded,
the memory occupied will still be counted.

I did a test on my development machine using `STREAM LOAD`: a table with
only one tablet and all columns are keys, the original data was
1.1G (9318799 rows), and there were 377745 rows after removing duplicates.

It can be found that both the number of files and the query efficiency are
greatly improved, the price paid is only a slight increase in load time.

before:
```
  $ ll storage/data/0/10019/1075020655/
  total 4540
  -rw------- 1 dev dev 393152 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_0_0.dat
  -rw------- 1 dev dev   1135 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_0_0.idx
  -rw------- 1 dev dev 421660 Dec  2 18:43 0200000000000004f5404b740288294b21e52b0786adf3be_10_0.dat
  -rw------- 1 dev dev   1185 Dec  2 18:43 0200000000000004f5404b740288294b21e52b0786adf3be_10_0.idx
  -rw------- 1 dev dev 184214 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_1_0.dat
  -rw------- 1 dev dev    610 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_1_0.idx
  -rw------- 1 dev dev 329181 Dec  2 18:43 0200000000000004f5404b740288294b21e52b0786adf3be_11_0.dat
  -rw------- 1 dev dev    935 Dec  2 18:43 0200000000000004f5404b740288294b21e52b0786adf3be_11_0.idx
  -rw------- 1 dev dev 343813 Dec  2 18:43 0200000000000004f5404b740288294b21e52b0786adf3be_12_0.dat
  -rw------- 1 dev dev    985 Dec  2 18:43 0200000000000004f5404b740288294b21e52b0786adf3be_12_0.idx
  -rw------- 1 dev dev 315364 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_2_0.dat
  -rw------- 1 dev dev    885 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_2_0.idx
  -rw------- 1 dev dev 423806 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_3_0.dat
  -rw------- 1 dev dev   1185 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_3_0.idx
  -rw------- 1 dev dev 294811 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_4_0.dat
  -rw------- 1 dev dev    835 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_4_0.idx
  -rw------- 1 dev dev 403241 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_5_0.dat
  -rw------- 1 dev dev   1135 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_5_0.idx
  -rw------- 1 dev dev 350753 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_6_0.dat
  -rw------- 1 dev dev    860 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_6_0.idx
  -rw------- 1 dev dev 266966 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_7_0.dat
  -rw------- 1 dev dev    735 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_7_0.idx
  -rw------- 1 dev dev 451191 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_8_0.dat
  -rw------- 1 dev dev   1235 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_8_0.idx
  -rw------- 1 dev dev 398439 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_9_0.dat
  -rw------- 1 dev dev   1110 Dec  2 18:42 0200000000000004f5404b740288294b21e52b0786adf3be_9_0.idx

  {
    "TxnId": 16,
    "Label": "cd9f8392-dfa0-4626-8034-22f7cb97044c",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 9318799,
    "NumberLoadedRows": 9318799,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 1079581477,
    "LoadTimeMs": 46907
  }

  mysql> select count(*) from xxx_before;
  +----------+
  | count(*) |
  +----------+
  |   377745 |
  +----------+
1 row in set (0.91 sec)

```

aftr:
```
  $ ll storage/data/0/10013/1075020655/
  total 3612
  -rw------- 1 dev dev 3328992 Dec  2 18:26 0200000000000003d44e5cc72626f95a0b196b52a05c0f8a_0_0.dat
  -rw------- 1 dev dev    8460 Dec  2 18:26 0200000000000003d44e5cc72626f95a0b196b52a05c0f8a_0_0.idx
  -rw------- 1 dev dev  350576 Dec  2 18:26 0200000000000003d44e5cc72626f95a0b196b52a05c0f8a_1_0.dat
  -rw------- 1 dev dev     985 Dec  2 18:26 0200000000000003d44e5cc72626f95a0b196b52a05c0f8a_1_0.idx

  {
    "TxnId": 12,
    "Label": "88f606d5-8095-4f15-b61d-49b7080c16b8",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 9318799,
    "NumberLoadedRows": 9318799,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 1079581477,
    "LoadTimeMs": 48771
  }

  mysql> select count(*) from xxx_after;
  +----------+
  | count(*) |
  +----------+
  |   377745 |
  +----------+
  1 row in set (0.38 sec)

```
  • Loading branch information
lingbin authored and imay committed Dec 6, 2019
1 parent 9fbc1c7 commit 177fec8
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 57 deletions.
66 changes: 47 additions & 19 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

#include "olap/memtable.h"

#include "common/object_pool.h"
#include "common/logging.h"
#include "olap/rowset/column_data_writer.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/row_cursor.h"
#include "olap/row.h"
#include "olap/schema.h"
#include "runtime/tuple.h"
#include "util/runtime_profile.h"
#include "util/debug_util.h"

namespace doris {
Expand All @@ -43,9 +42,9 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet

_schema_size = _schema->schema_size();
_mem_tracker.reset(new MemTracker(-1, "memtable", mem_tracker));
_mem_pool.reset(new MemPool(_mem_tracker.get()));
_tuple_buf = _mem_pool->allocate(_schema_size);
_skip_list = new Table(_row_comparator, _mem_pool.get());
_buffer_mem_pool.reset(new MemPool(_mem_tracker.get()));
_table_mem_pool.reset(new MemPool(_mem_tracker.get()));
_skip_list = new Table(_row_comparator, _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
}

MemTable::~MemTable() {
Expand All @@ -61,27 +60,56 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ
return compare_row(lhs_row, rhs_row);
}

size_t MemTable::memory_usage() {
return _mem_pool->mem_tracker()->consumption();
}
void MemTable::insert(const Tuple* tuple) {
bool overwritten = false;
uint8_t* _tuple_buf = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
// Will insert directly, so use memory from _table_mem_pool
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow row(_schema, _tuple_buf);
_tuple_to_row(tuple, &row, _table_mem_pool.get());
_skip_list->Insert((TableKey)_tuple_buf, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
return;
}

void MemTable::insert(Tuple* tuple) {
ContiguousRow row(_schema, _tuple_buf);
// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
// we first allocate from _buffer_mem_pool, and then check whether it already exists in
// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
// otherwise, we need to copy it into _table_mem_pool before we can insert it.
_tuple_buf = _buffer_mem_pool->allocate(_schema_size);
ContiguousRow src_row(_schema, _tuple_buf);
_tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());

bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
if (is_exist) {
_aggregate_two_row(src_row, _hint.curr->key);
} else {
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow dst_row(_schema, _tuple_buf);
copy_row(&dst_row, src_row, _table_mem_pool.get());
_skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
}

// Make MemPool to be reusable, but does not free its memory
_buffer_mem_pool->clear();
}

void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
for (size_t i = 0; i < _slot_descs->size(); ++i) {
auto cell = row.cell(i);
auto cell = row->cell(i);
const SlotDescriptor* slot = (*_slot_descs)[i];

bool is_null = tuple->is_null(slot->null_indicator_offset());
void* value = tuple->get_slot(slot->tuple_offset());
_schema->column(i)->consume(&cell, (const char *)value, is_null, _mem_pool.get(), &_agg_object_pool);
const void* value = tuple->get_slot(slot->tuple_offset());
_schema->column(i)->consume(
&cell, (const char*)value, is_null, mem_pool, &_agg_object_pool);
}
}

bool overwritten = false;
_skip_list->Insert((char*)_tuple_buf, &overwritten, _keys_type);
if (!overwritten) {
_tuple_buf = _mem_pool->allocate(_schema_size);
}
void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}

OLAPStatus MemTable::flush() {
Expand All @@ -92,7 +120,7 @@ OLAPStatus MemTable::flush() {
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
agg_finalize_row(&dst_row, _mem_pool.get());
agg_finalize_row(&dst_row, _table_mem_pool.get());
RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
}
RETURN_NOT_OK(_rowset_writer->flush());
Expand Down
46 changes: 34 additions & 12 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,51 @@
#ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H
#define DORIS_BE_SRC_OLAP_MEMTABLE_H

#include "common/object_pool.h"
#include "olap/skiplist.h"
#include "olap/olap_define.h"
#include "runtime/mem_tracker.h"

namespace doris {

class ContiguousRow;
class RowsetWriter;
class ObjectPool;
class Schema;
class SlotDescriptor;
class TabletSchema;
class Tuple;
class TupleDescriptor;

class MemTable {
public:
MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* mem_tracker);
~MemTable();

int64_t tablet_id() { return _tablet_id; }
size_t memory_usage();
void insert(Tuple* tuple);
size_t memory_usage() {
return _mem_tracker->consumption();
}
void insert(const Tuple* tuple);
OLAPStatus flush();
OLAPStatus close();

private:
class RowCursorComparator {
public:
RowCursorComparator(const Schema* schema);
int operator()(const char* left, const char* right) const;

private:
const Schema* _schema;
};
typedef SkipList<char*, RowCursorComparator> Table;
typedef Table::key_type TableKey;

void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);

int64_t _tablet_id;
Schema* _schema;
const TabletSchema* _tablet_schema;
Expand All @@ -48,21 +71,20 @@ class MemTable {
const std::vector<SlotDescriptor*>* _slot_descs;
KeysType _keys_type;

struct RowCursorComparator {
const Schema* _schema;
RowCursorComparator(const Schema* schema);
int operator()(const char* left, const char* right) const;
};

RowCursorComparator _row_comparator;
std::unique_ptr<MemTracker> _mem_tracker;
std::unique_ptr<MemPool> _mem_pool;
// This is a buffer, to hold the memory referenced by the rows that have not
// been inserted into the SkipList
std::unique_ptr<MemPool> _buffer_mem_pool;
// Only the rows will be inserted into SkipList can allocate memory from _table_mem_pool.
// In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
// reduce the number of segment files that are generated by current load
std::unique_ptr<MemPool> _table_mem_pool;
ObjectPool _agg_object_pool;

typedef SkipList<char*, RowCursorComparator> Table;
u_int8_t* _tuple_buf;
size_t _schema_size;
Table* _skip_list;
Table::Hint _hint;

RowsetWriter* _rowset_writer;

Expand Down
112 changes: 91 additions & 21 deletions be/src/olap/skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,52 @@
#include "gen_cpp/olap_file.pb.h"
#include "runtime/mem_pool.h"
#include "util/random.h"
#include "olap/row.h"

namespace doris {

template<typename Key, class Comparator>
class SkipList {
private:
struct Node;
enum { kMaxHeight = 12 };

public:
typedef Key key_type;
// One Hint object is to show position info of one row.
// It is used in the following scenarios:
// // 1. check for existence
// bool is_exist = skiplist->Find(key, &hint);
// // 2. Do something separately based on the value of is_exist
// if (is_exist) {
//     do_something1 ();
// } else {
//     do_something2 ();
// skiplist->InsertWithHint(key, is_exist, hint);
// }
//
// Note: The user should guarantee that there must not be any other insertion
// between calling Find() and InsertWithHint().
struct Hint {
Node* curr;
Node* prev[kMaxHeight];
};

// Create a new SkipList object that will use "cmp" for comparing keys,
// and will allocate memory using "*mem_pool". Objects allocated in the mem_pool
// must remain allocated for the lifetime of the skiplist object.
explicit SkipList(Comparator cmp, MemPool* mem_pool);
// and will allocate memory using "*mem_pool".
// NOTE: Objects allocated in the mem_pool must remain allocated for
// the lifetime of the skiplist object.
explicit SkipList(Comparator cmp, MemPool* mem_pool, bool can_dup);

// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key, bool* overwritten, KeysType keys_type);
void Aggregate(const Key& k1, const Key& k2);
void Insert(const Key& key, bool* overwritten);
// Use hint to insert a key. the hint is from previous Find()
void InsertWithHint(const Key& key, bool is_exist, Hint* hint);

// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key& key) const;
// Like Contains(), but it will return the position info as a hint. We can use this
// position info to insert directly using InsertWithHint().
bool Find(const Key& key, Hint* hint) const;

// Iteration over the contents of a skip list
class Iterator {
Expand Down Expand Up @@ -96,10 +120,10 @@ class SkipList {
};

private:
enum { kMaxHeight = 12 };

// Immutable after construction
Comparator const compare_;
// When value is true, means indicates that duplicate values are allowed.
bool _can_dup;
MemPool* const _mem_pool; // MemPool used for allocations of nodes

Node* const head_;
Expand Down Expand Up @@ -322,8 +346,9 @@ SkipList<Key,Comparator>::FindLast() const {
}

template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, MemPool* mem_pool)
: compare_(cmp),
SkipList<Key,Comparator>::SkipList(Comparator cmp, MemPool* mem_pool, bool can_dup) :
compare_(cmp),
_can_dup(can_dup),
_mem_pool(mem_pool),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(1),
Expand All @@ -334,22 +359,15 @@ SkipList<Key,Comparator>::SkipList(Comparator cmp, MemPool* mem_pool)
}

template<typename Key, class Comparator>
void SkipList<Key, Comparator>::Aggregate(const Key& k1, const Key& k2) {
ContiguousRow dst_row(compare_._schema, k1);
ContiguousRow src_row(compare_._schema, k2);
agg_update_row(&dst_row, src_row, _mem_pool);
}

template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key, bool* overwritten, KeysType keys_type) {
void SkipList<Key,Comparator>::Insert(const Key& key, bool* overwritten) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

#ifndef BE_TEST
if (x != nullptr && keys_type != KeysType::DUP_KEYS && Equal(key, x->key)) {
Aggregate(x->key, key);
// The key already exists and duplicate keys are not allowed, so we need to aggreage them
if (!_can_dup && x != nullptr && Equal(key, x->key)) {
*overwritten = true;
return;
}
Expand Down Expand Up @@ -383,6 +401,47 @@ void SkipList<Key,Comparator>::Insert(const Key& key, bool* overwritten, KeysTyp
}
}

// NOTE: Already be checked, the row is exist.
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::InsertWithHint(const Key& key, bool is_exist, Hint* hint) {
Node* x = hint->curr;
DCHECK(!is_exist || x) << "curr pointer must not be null if row exists";

#ifndef BE_TEST
// The key already exists and duplicate keys are not allowed, so we need to aggreage them
if (!_can_dup && is_exist) {
return;
}
#endif

Node** prev = hint->prev;
// Our data structure does not allow duplicate insertion
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (NULL), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since NULL sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}

template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, NULL);
Expand All @@ -393,6 +452,17 @@ bool SkipList<Key,Comparator>::Contains(const Key& key) const {
}
}

template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Find(const Key& key, Hint* hint) const {
Node* x = FindGreaterOrEqual(key, hint->prev);
hint->curr = x;
if (x != NULL && Equal(key, x->key)) {
return true;
} else {
return false;
}
}

} // namespace doris

#endif // DORIS_BE_SRC_OLAP_SKIPLIST_H
Loading

0 comments on commit 177fec8

Please sign in to comment.