From 60d39c7b174c4873f967f070c4d8548bec8f9dad Mon Sep 17 00:00:00 2001 From: HangyuanLiu <460660596@qq.com> Date: Mon, 15 Jun 2020 17:31:53 +0800 Subject: [PATCH] add materialized view function --- be/src/olap/column_mapping.h | 2 + be/src/olap/schema_change.cpp | 196 ++++++++++++++-- be/src/olap/schema_change.h | 21 +- be/test/olap/schema_change_test.cpp | 217 +++++++++++++++++- .../apache/doris/task/AlterReplicaTask.java | 32 ++- gensrc/thrift/AgentService.thrift | 7 + 6 files changed, 455 insertions(+), 20 deletions(-) diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index 7b19d9c5f111ec..89392579c904c1 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -31,6 +31,8 @@ struct ColumnMapping { int32_t ref_column; // normally for default value. stores values for filters WrapperField* default_value; + // materialize view transform function used in schema change + std::string materialized_function; }; typedef std::vector SchemaMapping; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 690fe89c29ee92..d0d03905514e92 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -17,9 +17,7 @@ #include "olap/schema_change.h" -#include -#include - +#include #include #include @@ -34,10 +32,8 @@ #include "olap/rowset/rowset_id_generator.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" -#include "common/resource_tls.h" #include "agent/cgroups_mgr.h" #include "runtime/exec_env.h" -#include "runtime/heartbeat_flags.h" using std::deque; using std::list; @@ -94,13 +90,11 @@ class RowBlockMerger { }; -RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr &base_tablet) { +RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) { _schema_mapping.resize(tablet_schema.num_columns()); } RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr& base_tablet, const DeleteHandler& delete_handler) { _schema_mapping.resize(tablet_schema.num_columns()); _delete_handler = delete_handler; @@ -194,6 +188,105 @@ ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index) break; \ } + +bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, + int field_idx, int ref_field_idx, MemPool* mem_pool) { + write_helper->set_not_null(ref_field_idx); + BitmapValue bitmap; + if (!read_helper->is_null(ref_field_idx)) { + uint64_t origin_value; + char *src = read_helper->cell_ptr(ref_field_idx); + switch (ref_column.type()) { + case OLAP_FIELD_TYPE_TINYINT: + if (*(int8_t *) src < 0) { + LOG(WARNING) << "The input: " << *(int8_t *) src + << " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently"; + return false; + } + origin_value = *(int8_t *) src; + break; + case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: + origin_value = *(uint8_t *) src; + break; + case OLAP_FIELD_TYPE_SMALLINT: + if (*(int16_t *) src < 0) { + LOG(WARNING) << "The input: " << *(int16_t *) src + << " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently"; + return false; + } + origin_value = *(int16_t *) src; + break; + case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: + origin_value = *(uint16_t *) src; + break; + case OLAP_FIELD_TYPE_INT: + if (*(int32_t *) src < 0) { + LOG(WARNING) << "The input: " << *(int32_t *) src + << " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently"; + return false; + } + origin_value = *(int32_t *) src; + break; + case OLAP_FIELD_TYPE_UNSIGNED_INT: + origin_value = *(uint32_t *) src; + break; + case OLAP_FIELD_TYPE_BIGINT: + if (*(int64_t *) src < 0) { + LOG(WARNING) << "The input: " << *(int64_t *) src + << " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently"; + return false; + } + origin_value = *(int64_t *) src; + break; + case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: + origin_value = *(uint64_t *) src; + break; + default: + LOG(WARNING) << "the column type which was altered from was unsupported." + << " from_type=" + << ref_column.type(); + return false; + } + bitmap.add(origin_value); + } + char *buf = reinterpret_cast(mem_pool->allocate(bitmap.getSizeInBytes())); + Slice dst(buf, bitmap.getSizeInBytes()); + bitmap.write(dst.data); + write_helper->set_field_content(field_idx, reinterpret_cast(&dst), mem_pool); + return true; +} + +bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, + int field_idx, int ref_field_idx, MemPool* mem_pool) { + write_helper->set_not_null(field_idx); + HyperLogLog hll; + if (!read_helper->is_null(ref_field_idx)) { + Slice src; + if (ref_column.type() != OLAP_FIELD_TYPE_VARCHAR) { + src.data = read_helper->cell_ptr(ref_field_idx); + src.size = ref_column.length(); + } else { + src = *reinterpret_cast(read_helper->cell_ptr(ref_field_idx)); + } + uint64_t hash_value = HashUtil::murmur_hash64A(src.data, src.size, HashUtil::MURMUR_SEED); + hll.update(hash_value); + } + std::string buf; + buf.resize(hll.max_serialized_size()); + buf.resize(hll.serialize((uint8_t *) buf.c_str())); + Slice dst(buf); + write_helper->set_field_content(field_idx, reinterpret_cast(&dst), mem_pool); + return true; +} + +bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, + int field_idx, int ref_field_idx, MemPool* mem_pool) { + write_helper->set_not_null(field_idx); + int64_t count = read_helper->is_null(field_idx) ? 0 : 1; + write_helper->set_field_content(field_idx, (char*)&count, mem_pool); + return true; +} + bool RowBlockChanger::change_row_block( const RowBlock* ref_block, int32_t data_version, @@ -266,6 +359,33 @@ bool RowBlockChanger::change_row_block( int32_t ref_column = _schema_mapping[i].ref_column; if (_schema_mapping[i].ref_column >= 0) { + if (!_schema_mapping[i].materialized_function.empty()) { + bool (*_do_materialized_transform) (RowCursor*, RowCursor*, const TabletColumn&, int, int, MemPool* ); + if (_schema_mapping[i].materialized_function == "to_bitmap") { + _do_materialized_transform = to_bitmap; + } else if (_schema_mapping[i].materialized_function == "hll_hash") { + _do_materialized_transform = hll_hash; + } else if (_schema_mapping[i].materialized_function == "count") { + _do_materialized_transform = count_field; + } else { + LOG(WARNING) << "error materialized view function : " << _schema_mapping[i].materialized_function; + return false; + } + VLOG(3) << "_schema_mapping[" << i << "].materialized_function : " << _schema_mapping[i].materialized_function; + for (size_t row_index = 0, new_row_index = 0; + row_index < ref_block->row_block_info().row_num; ++row_index) { + // No need row, need to be filter + if (need_filter_data && is_data_left_vec[row_index] == 0) { + continue; + } + mutable_block->get_row(new_row_index++, &write_helper); + ref_block->get_row(row_index, &read_helper); + + _do_materialized_transform(&read_helper, &write_helper, ref_block->tablet_schema().column(ref_column), i, _schema_mapping[i].ref_column, mem_pool); + } + continue; + } + // new column will be assigned as referenced column // check if the type of new column is equal to the older's. FieldType reftype = ref_block->tablet_schema().column(ref_column).type(); @@ -1383,6 +1503,36 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe sc_params.new_tablet = new_tablet; sc_params.ref_rowset_readers = rs_readers; sc_params.delete_handler = delete_handler; + if (request.__isset.materialized_view_params) { + for (auto item : request.materialized_view_params) { + AlterMaterializedViewParam mv_param; + mv_param.column_name = item.column_name; + /* + * origin_column_name is always be set now, + * but origin_column_name may be not set in some materialized view function. eg:count(1) + */ + if (item.__isset.origin_column_name) { + mv_param.origin_column_name = item.origin_column_name; + } + + /* + * TODO(lhy) + * Building the materialized view function for schema_change here based on defineExpr. + * This is a trick because the current storage layer does not support expression evaluation. + * We can refactor this part of the code until the uniform expression evaluates the logic. + * count distinct materialized view will set mv_expr with to_bitmap or hll_hash. + * count materialized view will set mv_expr with count. + */ + if (item.__isset.mv_expr) { + if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) { + mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name; + } else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) { + mv_param.mv_expr = "count_field"; + } + } + sc_params.materialized_params_map.insert(std::make_pair(item.column_name, mv_param)); + } + } res = _convert_historical_rowsets(sc_params); if (res != OLAP_SUCCESS) { @@ -1425,15 +1575,17 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( // a. 解析Alter请求,转换成内部的表示形式 // 不使用DELETE_DATA命令指定的删除条件 - RowBlockChanger rb_changer(new_tablet->tablet_schema(), base_tablet); + RowBlockChanger rb_changer(new_tablet->tablet_schema()); bool sc_sorting = false; bool sc_directly = false; + const std::unordered_map materialized_function_map; if (OLAP_SUCCESS != (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, - &sc_directly))) { + &sc_directly, + materialized_function_map))) { LOG(WARNING) << "failed to parse the request. res=" << res; return res; } @@ -1641,8 +1793,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // change中增加了filter信息,在_parse_request中会设置filter的column信息 // 并在每次row block的change时,过滤一些数据 - RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), - sc_params.base_tablet, sc_params.delete_handler); + RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler); bool sc_sorting = false; bool sc_directly = false; @@ -1650,7 +1801,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // a. 解析Alter请求,转换成内部的表示形式 OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, - &rb_changer, &sc_sorting, &sc_directly); + &rb_changer, &sc_sorting, &sc_directly, sc_params.materialized_params_map); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to parse the request. res=" << res; goto PROCESS_ALTER_EXIT; @@ -1784,7 +1935,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer, bool* sc_sorting, - bool* sc_directly) { + bool* sc_directly, + const std::unordered_map& materialized_function_map) { OLAPStatus res = OLAP_SUCCESS; // set column mapping @@ -1810,6 +1962,22 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, continue; } + if (materialized_function_map.find(column_name) != materialized_function_map.end()) { + AlterMaterializedViewParam mvParam = materialized_function_map.find(column_name)->second; + column_mapping->materialized_function = mvParam.mv_expr; + std::string origin_column_name = mvParam.origin_column_name; + int32_t column_index = base_tablet->field_index(origin_column_name); + if (column_index >= 0) { + column_mapping->ref_column = column_index; + continue; + } else { + LOG(WARNING) << "referenced column was missing. " + << "[column=" << column_name + << " referenced_column=" << column_index << "]"; + return OLAP_ERR_CE_CMD_PARAMS_ERROR; + } + } + int32_t column_index = base_tablet->field_index(column_name); if (column_index >= 0) { column_mapping->ref_column = column_index; diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index ac6f55dc730560..32131e1fb91740 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -40,14 +40,19 @@ class RowBlock; // defined in 'row_cursor.h' class RowCursor; +bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, + int field_idx, int ref_field_idx, MemPool* mem_pool); +bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, + int field_idx, int ref_field_idx, MemPool* mem_pool); +bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, + int field_idx, int ref_field_idx, MemPool* mem_pool); + class RowBlockChanger { public: RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr& base_tablet, const DeleteHandler& delete_handler); - RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr& base_tablet); + RowBlockChanger(const TabletSchema& tablet_schema); virtual ~RowBlockChanger(); @@ -231,12 +236,19 @@ class SchemaChangeHandler { OLAPStatus _get_versions_to_be_changed(TabletSharedPtr base_tablet, std::vector* versions_to_be_changed); + struct AlterMaterializedViewParam { + std::string column_name; + std::string origin_column_name; + std::string mv_expr; + }; + struct SchemaChangeParams { AlterTabletType alter_tablet_type; TabletSharedPtr base_tablet; TabletSharedPtr new_tablet; std::vector ref_rowset_readers; DeleteHandler delete_handler; + std::unordered_map materialized_params_map; }; // add alter task to base_tablet and new_tablet. @@ -259,7 +271,8 @@ class SchemaChangeHandler { TabletSharedPtr new_tablet, RowBlockChanger* rb_changer, bool* sc_sorting, - bool* sc_directly); + bool* sc_directly, + const std::unordered_map& materialized_function_map); // 需要新建default_value时的初始化设置 static OLAPStatus _init_column_mapping(ColumnMapping* column_mapping, diff --git a/be/test/olap/schema_change_test.cpp b/be/test/olap/schema_change_test.cpp index 389099ebe0b02e..52a3daeca52a24 100644 --- a/be/test/olap/schema_change_test.cpp +++ b/be/test/olap/schema_change_test.cpp @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#include "olap/schema_change.h" #include @@ -27,7 +28,6 @@ #include "olap/row_cursor.h" #include "olap/row_block.h" #include "runtime/mem_pool.h" -#include "runtime/string_value.hpp" #include "runtime/vectorized_row_batch.h" #include "util/logging.h" @@ -663,6 +663,221 @@ TEST_F(TestColumn, ConvertDecimalToVarchar) { decimal12_t val(456, 789000000); test_convert_to_varchar("Decimal", 12, val, "456.789000000", OLAP_SUCCESS); } + +void CreateTabletSchema(TabletSchema& tablet_schema) { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(KeysType::AGG_KEYS); + tablet_schema_pb.set_num_short_key_columns(2); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(4); + + ColumnPB* column_1 = tablet_schema_pb.add_column(); + column_1->set_unique_id(1); + column_1->set_name("k1"); + column_1->set_type("INT"); + column_1->set_is_key(true); + column_1->set_length(4); + column_1->set_index_length(4); + column_1->set_is_nullable(false); + column_1->set_is_bf_column(false); + + ColumnPB* column_2 = tablet_schema_pb.add_column(); + column_2->set_unique_id(2); + column_2->set_name("k2"); + column_2->set_type("VARCHAR"); + column_2->set_length(20); + column_2->set_index_length(20); + column_2->set_is_key(true); + column_2->set_is_nullable(false); + column_2->set_is_bf_column(false); + + ColumnPB* column_3 = tablet_schema_pb.add_column(); + column_3->set_unique_id(3); + column_3->set_name("k3"); + column_3->set_type("INT"); + column_3->set_is_key(true); + column_3->set_length(4); + column_3->set_index_length(4); + column_3->set_is_nullable(false); + column_3->set_is_bf_column(false); + + ColumnPB* column_4 = tablet_schema_pb.add_column(); + column_4->set_unique_id(4); + column_4->set_name("v1"); + column_4->set_type("INT"); + column_4->set_length(4); + column_4->set_is_key(false); + column_4->set_is_nullable(false); + column_4->set_is_bf_column(false); + column_4->set_aggregation("SUM"); + + tablet_schema.init_from_pb(tablet_schema_pb); +} + +TEST_F(TestColumn, ConvertIntToBitmap) { + //Base Tablet + TabletSchema tablet_schema; + CreateTabletSchema(tablet_schema); + //Base row block + CreateColumnWriter(tablet_schema); + + RowCursor write_row; + write_row.init(tablet_schema); + write_row.allocate_memory_for_string_type(tablet_schema); + RowBlock block(&tablet_schema); + RowBlockInfo block_info; + block_info.row_num = 10000; + block.init(block_info); + + std::vector val_string_array; + val_string_array.emplace_back("5"); + val_string_array.emplace_back("4"); + val_string_array.emplace_back("2"); + val_string_array.emplace_back("3"); + OlapTuple tuple(val_string_array); + write_row.from_tuple(tuple); + block.set_row(0, write_row); + block.finalize(1); + ASSERT_EQ(_column_writer->write_batch(&block, &write_row), OLAP_SUCCESS); + ColumnDataHeaderMessage header; + ASSERT_EQ(_column_writer->finalize(&header), OLAP_SUCCESS); + + //Materialized View tablet schema + TabletSchemaPB mv_tablet_schema_pb; + mv_tablet_schema_pb.set_keys_type(KeysType::AGG_KEYS); + mv_tablet_schema_pb.set_num_short_key_columns(2); + mv_tablet_schema_pb.set_num_rows_per_row_block(1024); + mv_tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + mv_tablet_schema_pb.set_next_column_unique_id(3); + + ColumnPB* mv_column_1 = mv_tablet_schema_pb.add_column(); + mv_column_1->set_unique_id(1); + mv_column_1->set_name("k1"); + mv_column_1->set_type("INT"); + mv_column_1->set_is_key(true); + mv_column_1->set_length(4); + mv_column_1->set_index_length(4); + mv_column_1->set_is_nullable(false); + mv_column_1->set_is_bf_column(false); + + ColumnPB* mv_column_2 = mv_tablet_schema_pb.add_column(); + mv_column_2->set_unique_id(2); + mv_column_2->set_name("v1"); + mv_column_2->set_type("OBJECT"); + mv_column_2->set_length(8); + mv_column_2->set_is_key(false); + mv_column_2->set_is_nullable(false); + mv_column_2->set_is_bf_column(false); + mv_column_2->set_aggregation("BITMAP_UNION"); + + TabletSchema mv_tablet_schema; + mv_tablet_schema.init_from_pb(mv_tablet_schema_pb); + + RowBlockChanger row_block_changer(mv_tablet_schema); + ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0); + column_mapping->ref_column = 0; + column_mapping = row_block_changer.get_mutable_column_mapping(1); + column_mapping->ref_column = 2; + column_mapping->materialized_function = "to_bitmap"; + + + RowBlock mutable_block(&mv_tablet_schema); + mutable_block.init(block_info); + uint64_t filtered_rows = 0; + row_block_changer.change_row_block(&block, 0, &mutable_block, &filtered_rows); + + RowCursor mv_row_cursor; + mv_row_cursor.init(mv_tablet_schema); + mutable_block.get_row(0, &mv_row_cursor); + + auto dst_slice = reinterpret_cast(mv_row_cursor.cell_ptr(1)); + BitmapValue bitmapValue(dst_slice->data); + ASSERT_EQ(bitmapValue.cardinality(), 1); +} + +TEST_F(TestColumn, ConvertCharToHLL) { + //Base Tablet + TabletSchema tablet_schema; + CreateTabletSchema(tablet_schema); + + //Base row block + CreateColumnWriter(tablet_schema); + + RowCursor write_row; + write_row.init(tablet_schema); + write_row.allocate_memory_for_string_type(tablet_schema); + RowBlock block(&tablet_schema); + RowBlockInfo block_info; + block_info.row_num = 10000; + block.init(block_info); + + std::vector val_string_array; + //std::string origin_val = "2019-11-25 19:07:00"; + //val_string_array.emplace_back(origin_val); + val_string_array.emplace_back("1"); + val_string_array.emplace_back("1"); + val_string_array.emplace_back("2"); + val_string_array.emplace_back("3"); + OlapTuple tuple(val_string_array); + write_row.from_tuple(tuple); + block.set_row(0, write_row); + block.finalize(1); + ASSERT_EQ(_column_writer->write_batch(&block, &write_row), OLAP_SUCCESS); + ColumnDataHeaderMessage header; + ASSERT_EQ(_column_writer->finalize(&header), OLAP_SUCCESS); + + //Materialized View tablet schema + TabletSchemaPB mv_tablet_schema_pb; + mv_tablet_schema_pb.set_keys_type(KeysType::AGG_KEYS); + mv_tablet_schema_pb.set_num_short_key_columns(2); + mv_tablet_schema_pb.set_num_rows_per_row_block(1024); + mv_tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + mv_tablet_schema_pb.set_next_column_unique_id(3); + + ColumnPB* mv_column_1 = mv_tablet_schema_pb.add_column(); + mv_column_1->set_unique_id(1); + mv_column_1->set_name("k1"); + mv_column_1->set_type("INT"); + mv_column_1->set_is_key(true); + mv_column_1->set_length(4); + mv_column_1->set_index_length(4); + mv_column_1->set_is_nullable(false); + mv_column_1->set_is_bf_column(false); + + ColumnPB* mv_column_2 = mv_tablet_schema_pb.add_column(); + mv_column_2->set_unique_id(2); + mv_column_2->set_name("v1"); + mv_column_2->set_type("HLL"); + mv_column_2->set_length(4); + mv_column_2->set_is_key(false); + mv_column_2->set_is_nullable(false); + mv_column_2->set_is_bf_column(false); + mv_column_2->set_aggregation("HLL_UNION"); + + TabletSchema mv_tablet_schema; + mv_tablet_schema.init_from_pb(mv_tablet_schema_pb); + + RowBlockChanger row_block_changer(mv_tablet_schema); + ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0); + column_mapping->ref_column = 0; + column_mapping = row_block_changer.get_mutable_column_mapping(1); + column_mapping->ref_column = 1; + column_mapping->materialized_function = "hll_hash"; + + RowBlock mutable_block(&mv_tablet_schema); + mutable_block.init(block_info); + uint64_t filtered_rows = 0; + row_block_changer.change_row_block(&block, 0, &mutable_block, &filtered_rows); + + RowCursor mv_row_cursor; + mv_row_cursor.init(mv_tablet_schema); + mutable_block.get_row(0, &mv_row_cursor); + + auto dst_slice = reinterpret_cast(mv_row_cursor.cell_ptr(1)); + HyperLogLog hll(dst_slice->data); + ASSERT_EQ(hll.estimate_cardinality(), 1); +} } int main(int argc, char** argv) { diff --git a/fe/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/src/main/java/org/apache/doris/task/AlterReplicaTask.java index d682383b14c38c..5b82fe8ef3a112 100644 --- a/fe/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -17,10 +17,17 @@ package org.apache.doris.task; +import com.google.common.collect.Lists; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.thrift.TAlterMaterializedViewParam; import org.apache.doris.thrift.TAlterTabletReqV2; import org.apache.doris.thrift.TTaskType; +import java.util.List; +import java.util.Map; + /* * This task is used for alter table process, such as rollup and schema change * The task will do data transformation from base replica to new replica. @@ -38,10 +45,22 @@ public class AlterReplicaTask extends AgentTask { private long jobId; private AlterJobV2.JobType jobType; + private Map defineExprs; + + public AlterReplicaTask(long backendId, long dbId, long tableId, + long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, + long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, + long version, long versionHash, long jobId, AlterJobV2.JobType jobType) { + this(backendId, dbId, tableId, partitionId, + rollupIndexId, baseIndexId, rollupTabletId, + baseTabletId, newReplicaId, newSchemaHash, baseSchemaHash, + version, versionHash, jobId, jobType, null); + } + public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, - long version, long versionHash, long jobId, AlterJobV2.JobType jobType) { + long version, long versionHash, long jobId, AlterJobV2.JobType jobType, Map defineExprs) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); this.baseTabletId = baseTabletId; @@ -55,6 +74,7 @@ public AlterReplicaTask(long backendId, long dbId, long tableId, this.jobId = jobId; this.jobType = jobType; + this.defineExprs = defineExprs; } public long getBaseTabletId() { @@ -93,6 +113,16 @@ public TAlterTabletReqV2 toThrift() { TAlterTabletReqV2 req = new TAlterTabletReqV2(baseTabletId, signature, baseSchemaHash, newSchemaHash); req.setAlter_version(version); req.setAlter_version_hash(versionHash); + if (defineExprs != null) { + for (Map.Entry entry : defineExprs.entrySet()) { + List slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); + mvParam.setOrigin_column_name(slots.get(0).getColumnName()); + mvParam.setMv_expr(entry.getValue().treeToThrift()); + req.addToMaterialized_view_params(mvParam); + } + } return req; } } diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index ace308c033380a..63d8c28ec104ce 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -104,6 +104,13 @@ struct TAlterTabletReqV2 { // version of data which this alter task should transform 5: optional Types.TVersion alter_version 6: optional Types.TVersionHash alter_version_hash // Deprecated + 7: optional list materialized_view_params +} + +struct TAlterMaterializedViewParam { + 1: required string column_name + 2: optional string origin_column_name + 3: optional Exprs.TExpr mv_expr } struct TClusterInfo {