Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,9 @@ OLAPStatus Compaction::construct_output_rowset_writer() {

OLAPStatus Compaction::construct_input_rowset_readers() {
for (auto& rowset : _input_rowsets) {
RowsetReaderSharedPtr rs_reader(rowset->create_reader());
if (rs_reader == nullptr) {
LOG(WARNING) << "rowset create reader failed. rowset:" << rowset->rowset_id();
return OLAP_ERR_ROWSET_CREATE_READER;
}
_input_rs_readers.push_back(rs_reader);
RowsetReaderSharedPtr rs_reader;
RETURN_NOT_OK(rowset->create_reader(&rs_reader));
_input_rs_readers.push_back(std::move(rs_reader));
}
return OLAP_SUCCESS;
}
Expand Down
37 changes: 16 additions & 21 deletions be/src/olap/generic_iterators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) {
row_idx++;
_rows_returned++;
}
block->resize(row_idx);
block->set_num_rows(row_idx);
if (row_idx > 0) {
return Status::OK();
}
Expand All @@ -114,7 +114,7 @@ class MergeContext {
public:
// This class don't take iter's ownership, client should delete it
MergeContext(RowwiseIterator* iter)
: _iter(iter), _block(iter->schema(), 1024, &_arena) {
: _iter(iter), _block(iter->schema(), 1024) {
}

// Intialize this context and will prepare data for current_row()
Expand Down Expand Up @@ -144,7 +144,6 @@ class MergeContext {

private:
RowwiseIterator* _iter;
Arena _arena;
// used to store data load from iterator
RowBlockV2 _block;

Expand Down Expand Up @@ -176,6 +175,7 @@ Status MergeContext::advance() {
Status MergeContext::_load_next_block() {
Status st;
do {
_block.clear();
st = _iter->next_batch(&_block);
if (!st.ok()) {
_valid = false;
Expand All @@ -191,23 +191,9 @@ Status MergeContext::_load_next_block() {
return Status::OK();
}

struct MergeContextComaprator {
MergeContextComaprator(Schema* schema) : _schema(schema) { }
bool operator()(const MergeContext* lhs, const MergeContext* rhs) const {
auto lhs_row = lhs->current_row();
auto rhs_row = rhs->current_row();

return compare_row(lhs_row, rhs_row) > 0;
}
private:
Schema* _schema;
};

class MergeIterator : public RowwiseIterator {
public:
// Iterators' ownership it transfered to this class.
// This class will delete all iterators when destructs
// Client should not use iterators any more.
// MergeIterator takes the ownership of input iterators
MergeIterator(std::vector<RowwiseIterator*> iters)
: _origin_iters(std::move(iters)) {
}
Expand All @@ -231,7 +217,16 @@ class MergeIterator : public RowwiseIterator {
std::vector<MergeContext*> _merge_ctxs;

std::unique_ptr<Schema> _schema;
using MergeHeap = std::priority_queue<MergeContext*, std::vector<MergeContext*>, MergeContextComaprator>;

struct MergeContextComparator {
bool operator()(const MergeContext* lhs, const MergeContext* rhs) const {
auto lhs_row = lhs->current_row();
auto rhs_row = rhs->current_row();

return compare_row(lhs_row, rhs_row) > 0;
}
};
using MergeHeap = std::priority_queue<MergeContext*, std::vector<MergeContext*>, MergeContextComparator>;
std::unique_ptr<MergeHeap> _merge_heap;
};

Expand All @@ -240,7 +235,7 @@ Status MergeIterator::init(const StorageReadOptions& opts) {
return Status::OK();
}
_schema.reset(new Schema(_origin_iters[0]->schema()));
_merge_heap.reset(new MergeHeap(MergeContextComaprator(_schema.get())));
_merge_heap.reset(new MergeHeap);

for (auto iter : _origin_iters) {
std::unique_ptr<MergeContext> ctx(new MergeContext(iter));
Expand Down Expand Up @@ -269,7 +264,7 @@ Status MergeIterator::next_batch(RowBlockV2* block) {
_merge_heap->push(ctx);
}
}
block->resize(row_idx);
block->set_num_rows(row_idx);
if (row_idx > 0) {
return Status::OK();
} else {
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ enum OLAPStatus {
OLAP_ERR_ROWSET_TYPE_NOT_FOUND = -3105,
OLAP_ERR_ROWSET_ALREADY_EXIST = -3106,
OLAP_ERR_ROWSET_CREATE_READER = -3107,
OLAP_ERR_ROWSET_INVALID = -3108
OLAP_ERR_ROWSET_INVALID = -3108,
OLAP_ERR_ROWSET_LOAD_FAILED = -3109,
OLAP_ERR_ROWSET_READER_INIT = -3110,
OLAP_ERR_ROWSET_READ_FAILED = -3111
};

enum ColumnFamilyIndex {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/olap_snapshot_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/olap_snapshot_converter.h"

#include "olap/rowset/alpha_rowset.h"
#include "olap/rowset/alpha_rowset_meta.h"
#include "olap/rowset/rowset_id_generator.h"

namespace doris {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/row.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void copy_row(DstRowType* dst, const SrcRowType& src, MemPool* pool) {
// Deep copy src row to dst row. Schema of src and dst row must be same.
template<typename DstRowType, typename SrcRowType>
void copy_row(DstRowType* dst, const SrcRowType& src, Arena* arena) {
for (uint32_t cid = 0; cid < dst->schema()->num_columns(); ++cid) {
for (auto cid : dst->schema()->column_ids()) {
auto dst_cell = dst->cell(cid);
auto src_cell = src.cell(cid);
dst->schema()->column(cid)->deep_copy(&dst_cell, src_cell, arena);
Expand Down
48 changes: 17 additions & 31 deletions be/src/olap/row_block2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,60 +26,46 @@
using strings::Substitute;
namespace doris {

RowBlockV2::RowBlockV2(const Schema& schema,
uint16_t capacity, Arena* arena)
: _schema(schema),
_column_datas(_schema.num_columns(), nullptr),
_column_null_bitmaps(_schema.num_columns(), nullptr),
_capacity(capacity),
_num_rows(0),
_arena(arena) {
RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity)
: _schema(schema),
_capacity(capacity),
_column_datas(_schema.num_columns(), nullptr),
_column_null_bitmaps(_schema.num_columns(), nullptr) {
auto bitmap_size = BitmapSize(capacity);
int i = 0;
for (auto& col_schema : _schema.columns()) {
size_t data_size = col_schema->type_info()->size() * _capacity;
_column_datas[i] = new uint8_t[data_size];
for (auto cid : _schema.column_ids()) {
size_t data_size = _schema.column(cid)->type_info()->size() * _capacity;
_column_datas[cid] = new uint8_t[data_size];

uint8_t* null_bitmap = nullptr;
if (col_schema->is_nullable()) {
null_bitmap = new uint8_t[bitmap_size];
if (_schema.column(cid)->is_nullable()) {
_column_null_bitmaps[cid] = new uint8_t[bitmap_size];;
}
_column_null_bitmaps[i] = null_bitmap;

i++;
}
clear();
}

RowBlockV2::~RowBlockV2() {
for (auto data : _column_datas) {
delete data;
delete[] data;
}
for (auto null_bitmap : _column_null_bitmaps) {
delete null_bitmap;
delete[] null_bitmap;
}
}

Status RowBlockV2::copy_to_row_cursor(size_t row_idx, RowCursor* cursor) {
if (row_idx >= _num_rows) {
return Status::InvalidArgument(
Substitute("Row index is large than number rows, $0 vs $1", row_idx, _num_rows));
Substitute("invalid row index $0 (num_rows=$1)", row_idx, _num_rows));
}
#if 0
for (int i = 0; i < _column_ids.size(); ++i) {
auto cid = _column_ids[i];
bool is_null = _column_schemas[cid].field_info().is_allow_null
&& BitmapTest(_column_null_bitmaps[i], row_idx);
for (auto cid : _schema.column_ids()) {
bool is_null = _schema.column(cid)->is_nullable() && BitmapTest(_column_null_bitmaps[cid], row_idx);
if (is_null) {
cursor->set_null(cid);
} else {
const TypeInfo* type_info = _column_schemas[cid].type_info();
cursor->set_not_null(cid);
char* dest = cursor->get_field_content_ptr(cid);
char* src = (char*)_column_datas[i] + row_idx * type_info->size();
type_info->direct_copy(dest, src);
cursor->set_field_content_shallow(cid, reinterpret_cast<const char*>(column_block(cid).cell_ptr(row_idx)));
}
}
#endif
return Status::OK();
}

Expand Down
43 changes: 31 additions & 12 deletions be/src/olap/row_block2.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,38 @@ class RowCursor;
// in block, however it is used by old code, which we don't want to change.
class RowBlockV2 {
public:
RowBlockV2(const Schema& schema, uint16_t capacity, Arena* arena);
RowBlockV2(const Schema& schema, uint16_t capacity);
~RowBlockV2();

void resize(size_t num_rows) { _num_rows = num_rows; }
// update number of rows contained in this block
void set_num_rows(size_t num_rows) { _num_rows = num_rows; }
// return number of rows contained in this block
size_t num_rows() const { return _num_rows; }
// return the maximum number of rows that can be contained in this block.
// invariant: 0 <= num_rows() <= capacity()
size_t capacity() const { return _capacity; }
Arena* arena() const { return _arena; }
Arena* arena() const { return _arena.get(); }

// reset the state of the block so that it can be reused for write.
// all previously returned ColumnBlocks are invalidated after clear(), accessing them
// will result in undefined behavior.
void clear() {
_num_rows = 0;
_arena.reset(new Arena);
}

// Copy the row_idx row's data into given row_cursor.
// This function will use shallow copy, so the client should
// notice the life time of returned value
Status copy_to_row_cursor(size_t row_idx, RowCursor* row_cursor);

// Get column block for input column index. This input is the index in
// this row block, is not the index in table's schema
ColumnBlock column_block(size_t col_idx) const {
const TypeInfo* type_info = _schema.column(col_idx)->type_info();
uint8_t* data = _column_datas[col_idx];
uint8_t* null_bitmap = _column_null_bitmaps[col_idx];
return ColumnBlock(type_info, data, null_bitmap, _arena);
// Get the column block for one of the columns in this row block.
// `cid` must be one of `schema()->column_ids()`.
ColumnBlock column_block(ColumnId cid) const {
const TypeInfo* type_info = _schema.column(cid)->type_info();
uint8_t* data = _column_datas[cid];
uint8_t* null_bitmap = _column_null_bitmaps[cid];
return ColumnBlock(type_info, data, null_bitmap, _arena.get());
}

RowBlockRow row(size_t row_idx) const;
Expand All @@ -64,11 +76,18 @@ class RowBlockV2 {

private:
Schema _schema;
size_t _capacity;
// keeps fixed-size (field_size x capacity) data vector for each column,
// _column_datas[cid] == null if cid is not in `_schema`.
// memory are not allocated from `_arena` because we don't wan't to reallocate them in clear()
std::vector<uint8_t*> _column_datas;
// keeps null bitmap for each column,
// _column_null_bitmaps[cid] == null if cid is not in `_schema` or the column is not null.
// memory are not allocated from `_arena` because we don't wan't to reallocate them in clear()
std::vector<uint8_t*> _column_null_bitmaps;
size_t _capacity;
size_t _num_rows;
Arena* _arena;
// manages the memory for slice's data
std::unique_ptr<Arena> _arena;
};

// Stands for a row in RowBlockV2. It is consisted of a RowBlockV2 reference
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/row_cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ RowCursor::~RowCursor() {
OLAPStatus RowCursor::_init(const std::vector<TabletColumn>& schema,
const std::vector<uint32_t>& columns) {
_schema.reset(new Schema(schema, columns));
_fixed_len = _schema->schema_size();
_variable_len = 0;
for (auto cid : columns) {
if (_schema->column(cid) == nullptr) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ add_library(Rowset STATIC
alpha_rowset_writer.cpp
alpha_rowset_meta.cpp
beta_rowset.cpp
beta_rowset_reader.cpp
beta_rowset_writer.cpp)
13 changes: 7 additions & 6 deletions be/src/olap/rowset/alpha_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "olap/rowset/alpha_rowset.h"
#include "olap/rowset/alpha_rowset_meta.h"
#include "olap/rowset/alpha_rowset_reader.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/row.h"
#include "util/hash_util.hpp"
Expand All @@ -41,6 +42,7 @@ OLAPStatus AlphaRowset::init() {

OLAPStatus AlphaRowset::load(bool use_cache) {
// load is depend on init, so that check if init here and do init if not
// TODO remove the following if block when rowset is guaranteed to be initialized
if (!is_inited()) {
OLAPStatus res = init();
if (res != OLAP_SUCCESS) {
Expand Down Expand Up @@ -73,17 +75,16 @@ OLAPStatus AlphaRowset::load(bool use_cache) {
return OLAP_SUCCESS;
}

std::shared_ptr<RowsetReader> AlphaRowset::create_reader() {
OLAPStatus AlphaRowset::create_reader(std::shared_ptr<RowsetReader>* result) {
if (!is_loaded()) {
OLAPStatus status = load();
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "alpha rowset load failed. rowset path:" << _rowset_path;
return nullptr;
return OLAP_ERR_ROWSET_CREATE_READER;
}
set_loaded(true);
}
return std::shared_ptr<RowsetReader>(new AlphaRowsetReader(
_schema->num_rows_per_row_block(), shared_from_this()));
result->reset(new AlphaRowsetReader(
_schema->num_rows_per_row_block(), std::static_pointer_cast<AlphaRowset>(shared_from_this())));
return OLAP_SUCCESS;
}

OLAPStatus AlphaRowset::remove() {
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/alpha_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

#include "olap/rowset/rowset.h"
#include "olap/rowset/segment_group.h"
#include "olap/rowset/alpha_rowset_reader.h"
#include "olap/rowset/alpha_rowset_writer.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/data_dir.h"
#include "olap/tuple.h"
Expand All @@ -33,6 +31,8 @@ namespace doris {

class AlphaRowset;
using AlphaRowsetSharedPtr = std::shared_ptr<AlphaRowset>;
class AlphaRowsetWriter;
class AlphaRowsetReader;

class AlphaRowset : public Rowset {
public:
Expand All @@ -51,7 +51,7 @@ class AlphaRowset : public Rowset {
// always means that there are some io
OLAPStatus load(bool use_cache = true) override;

std::shared_ptr<RowsetReader> create_reader() override;
OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) override;

OLAPStatus remove() override;

Expand Down
Loading