Skip to content

Commit

Permalink
add materialized view function
Browse files Browse the repository at this point in the history
  • Loading branch information
HangyuanLiu committed Jun 15, 2020
1 parent 75f4df4 commit 60d39c7
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 20 deletions.
2 changes: 2 additions & 0 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct ColumnMapping {
int32_t ref_column;
// normally for default value. stores values for filters
WrapperField* default_value;
// materialize view transform function used in schema change
std::string materialized_function;
};

typedef std::vector<ColumnMapping> SchemaMapping;
Expand Down
196 changes: 182 additions & 14 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

#include "olap/schema_change.h"

#include <pthread.h>
#include <signal.h>

#include <csignal>
#include <algorithm>
#include <vector>

Expand All @@ -34,10 +32,8 @@
#include "olap/rowset/rowset_id_generator.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "common/resource_tls.h"
#include "agent/cgroups_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"

using std::deque;
using std::list;
Expand Down Expand Up @@ -94,13 +90,11 @@ class RowBlockMerger {
};


RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr &base_tablet) {
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) {
_schema_mapping.resize(tablet_schema.num_columns());
}

RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr& base_tablet,
const DeleteHandler& delete_handler) {
_schema_mapping.resize(tablet_schema.num_columns());
_delete_handler = delete_handler;
Expand Down Expand Up @@ -194,6 +188,105 @@ ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index)
break; \
}


bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(ref_field_idx);
BitmapValue bitmap;
if (!read_helper->is_null(ref_field_idx)) {
uint64_t origin_value;
char *src = read_helper->cell_ptr(ref_field_idx);
switch (ref_column.type()) {
case OLAP_FIELD_TYPE_TINYINT:
if (*(int8_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int8_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int8_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
origin_value = *(uint8_t *) src;
break;
case OLAP_FIELD_TYPE_SMALLINT:
if (*(int16_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int16_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int16_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
origin_value = *(uint16_t *) src;
break;
case OLAP_FIELD_TYPE_INT:
if (*(int32_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int32_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int32_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_INT:
origin_value = *(uint32_t *) src;
break;
case OLAP_FIELD_TYPE_BIGINT:
if (*(int64_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int64_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int64_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
origin_value = *(uint64_t *) src;
break;
default:
LOG(WARNING) << "the column type which was altered from was unsupported."
<< " from_type="
<< ref_column.type();
return false;
}
bitmap.add(origin_value);
}
char *buf = reinterpret_cast<char *>(mem_pool->allocate(bitmap.getSizeInBytes()));
Slice dst(buf, bitmap.getSizeInBytes());
bitmap.write(dst.data);
write_helper->set_field_content(field_idx, reinterpret_cast<char *>(&dst), mem_pool);
return true;
}

bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
HyperLogLog hll;
if (!read_helper->is_null(ref_field_idx)) {
Slice src;
if (ref_column.type() != OLAP_FIELD_TYPE_VARCHAR) {
src.data = read_helper->cell_ptr(ref_field_idx);
src.size = ref_column.length();
} else {
src = *reinterpret_cast<Slice *>(read_helper->cell_ptr(ref_field_idx));
}
uint64_t hash_value = HashUtil::murmur_hash64A(src.data, src.size, HashUtil::MURMUR_SEED);
hll.update(hash_value);
}
std::string buf;
buf.resize(hll.max_serialized_size());
buf.resize(hll.serialize((uint8_t *) buf.c_str()));
Slice dst(buf);
write_helper->set_field_content(field_idx, reinterpret_cast<char *>(&dst), mem_pool);
return true;
}

bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
int64_t count = read_helper->is_null(field_idx) ? 0 : 1;
write_helper->set_field_content(field_idx, (char*)&count, mem_pool);
return true;
}

bool RowBlockChanger::change_row_block(
const RowBlock* ref_block,
int32_t data_version,
Expand Down Expand Up @@ -266,6 +359,33 @@ bool RowBlockChanger::change_row_block(
int32_t ref_column = _schema_mapping[i].ref_column;

if (_schema_mapping[i].ref_column >= 0) {
if (!_schema_mapping[i].materialized_function.empty()) {
bool (*_do_materialized_transform) (RowCursor*, RowCursor*, const TabletColumn&, int, int, MemPool* );
if (_schema_mapping[i].materialized_function == "to_bitmap") {
_do_materialized_transform = to_bitmap;
} else if (_schema_mapping[i].materialized_function == "hll_hash") {
_do_materialized_transform = hll_hash;
} else if (_schema_mapping[i].materialized_function == "count") {
_do_materialized_transform = count_field;
} else {
LOG(WARNING) << "error materialized view function : " << _schema_mapping[i].materialized_function;
return false;
}
VLOG(3) << "_schema_mapping[" << i << "].materialized_function : " << _schema_mapping[i].materialized_function;
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// No need row, need to be filter
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
}
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);

_do_materialized_transform(&read_helper, &write_helper, ref_block->tablet_schema().column(ref_column), i, _schema_mapping[i].ref_column, mem_pool);
}
continue;
}

// new column will be assigned as referenced column
// check if the type of new column is equal to the older's.
FieldType reftype = ref_block->tablet_schema().column(ref_column).type();
Expand Down Expand Up @@ -1383,6 +1503,36 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
sc_params.new_tablet = new_tablet;
sc_params.ref_rowset_readers = rs_readers;
sc_params.delete_handler = delete_handler;
if (request.__isset.materialized_view_params) {
for (auto item : request.materialized_view_params) {
AlterMaterializedViewParam mv_param;
mv_param.column_name = item.column_name;
/*
* origin_column_name is always be set now,
* but origin_column_name may be not set in some materialized view function. eg:count(1)
*/
if (item.__isset.origin_column_name) {
mv_param.origin_column_name = item.origin_column_name;
}

/*
* TODO(lhy)
* Building the materialized view function for schema_change here based on defineExpr.
* This is a trick because the current storage layer does not support expression evaluation.
* We can refactor this part of the code until the uniform expression evaluates the logic.
* count distinct materialized view will set mv_expr with to_bitmap or hll_hash.
* count materialized view will set mv_expr with count.
*/
if (item.__isset.mv_expr) {
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}
}
sc_params.materialized_params_map.insert(std::make_pair(item.column_name, mv_param));
}
}

res = _convert_historical_rowsets(sc_params);
if (res != OLAP_SUCCESS) {
Expand Down Expand Up @@ -1425,15 +1575,17 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(

// a. 解析Alter请求,转换成内部的表示形式
// 不使用DELETE_DATA命令指定的删除条件
RowBlockChanger rb_changer(new_tablet->tablet_schema(), base_tablet);
RowBlockChanger rb_changer(new_tablet->tablet_schema());
bool sc_sorting = false;
bool sc_directly = false;

const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map;
if (OLAP_SUCCESS != (res = _parse_request(base_tablet,
new_tablet,
&rb_changer,
&sc_sorting,
&sc_directly))) {
&sc_directly,
materialized_function_map))) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return res;
}
Expand Down Expand Up @@ -1641,16 +1793,15 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa

// change中增加了filter信息,在_parse_request中会设置filter的column信息
// 并在每次row block的change时,过滤一些数据
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(),
sc_params.base_tablet, sc_params.delete_handler);
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler);

bool sc_sorting = false;
bool sc_directly = false;
SchemaChange* sc_procedure = nullptr;

// a. 解析Alter请求,转换成内部的表示形式
OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet,
&rb_changer, &sc_sorting, &sc_directly);
&rb_changer, &sc_sorting, &sc_directly, sc_params.materialized_params_map);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to parse the request. res=" << res;
goto PROCESS_ALTER_EXIT;
Expand Down Expand Up @@ -1784,7 +1935,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer,
bool* sc_sorting,
bool* sc_directly) {
bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map) {
OLAPStatus res = OLAP_SUCCESS;

// set column mapping
Expand All @@ -1810,6 +1962,22 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
continue;
}

if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
AlterMaterializedViewParam mvParam = materialized_function_map.find(column_name)->second;
column_mapping->materialized_function = mvParam.mv_expr;
std::string origin_column_name = mvParam.origin_column_name;
int32_t column_index = base_tablet->field_index(origin_column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
} else {
LOG(WARNING) << "referenced column was missing. "
<< "[column=" << column_name
<< " referenced_column=" << column_index << "]";
return OLAP_ERR_CE_CMD_PARAMS_ERROR;
}
}

int32_t column_index = base_tablet->field_index(column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
Expand Down
21 changes: 17 additions & 4 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@ class RowBlock;
// defined in 'row_cursor.h'
class RowCursor;

bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);
bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);
bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);

class RowBlockChanger {
public:
RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr& base_tablet,
const DeleteHandler& delete_handler);

RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr& base_tablet);
RowBlockChanger(const TabletSchema& tablet_schema);

virtual ~RowBlockChanger();

Expand Down Expand Up @@ -231,12 +236,19 @@ class SchemaChangeHandler {
OLAPStatus _get_versions_to_be_changed(TabletSharedPtr base_tablet,
std::vector<Version>* versions_to_be_changed);

struct AlterMaterializedViewParam {
std::string column_name;
std::string origin_column_name;
std::string mv_expr;
};

struct SchemaChangeParams {
AlterTabletType alter_tablet_type;
TabletSharedPtr base_tablet;
TabletSharedPtr new_tablet;
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler delete_handler;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
};

// add alter task to base_tablet and new_tablet.
Expand All @@ -259,7 +271,8 @@ class SchemaChangeHandler {
TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer,
bool* sc_sorting,
bool* sc_directly);
bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map);

// 需要新建default_value时的初始化设置
static OLAPStatus _init_column_mapping(ColumnMapping* column_mapping,
Expand Down
Loading

0 comments on commit 60d39c7

Please sign in to comment.