diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index caf2f1cf300..3afacefabe4 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -61,7 +61,8 @@ namespace DB F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \ F(type_exchange_receiver, {"type", "exchange_receiver"}), F(type_projection, {"type", "projection"}), \ F(type_partition_ts, {"type", "partition_table_scan"}), \ - F(type_window, {"type", "window"}), F(type_window_sort, {"type", "window_sort"})) \ + F(type_window, {"type", "window"}), F(type_window_sort, {"type", "window_sort"}), \ + F(type_expand, {"type", "expand"})) \ M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \ F(type_cop, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \ F(type_batch, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20}), \ @@ -276,7 +277,8 @@ namespace DB M(tiflash_compute_request_unit, "Request Unit used by tiflash compute", Counter, \ F(type_mpp, {{"type", "mpp"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \ F(type_cop, {{"type", "cop"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \ - F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()})) \ + F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()})) + // clang-format on /// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)] diff --git a/dbms/src/DataStreams/ExpressionBlockInputStream.cpp b/dbms/src/DataStreams/ExpressionBlockInputStream.cpp index b288155c142..5bc7a4685e4 100644 --- a/dbms/src/DataStreams/ExpressionBlockInputStream.cpp +++ b/dbms/src/DataStreams/ExpressionBlockInputStream.cpp @@ -53,5 +53,4 @@ Block ExpressionBlockInputStream::readImpl() expression->execute(res); return res; } - } // namespace DB diff --git a/dbms/src/Debug/MockExecutor/ExpandBinder.cpp b/dbms/src/Debug/MockExecutor/ExpandBinder.cpp new file mode 100644 index 00000000000..edc124104c4 --- /dev/null +++ b/dbms/src/Debug/MockExecutor/ExpandBinder.cpp @@ -0,0 +1,68 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB::mock +{ + +bool ExpandBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) +{ + tipb_executor->set_tp(tipb::ExecType::TypeExpand); + tipb_executor->set_executor_id(name); + tipb::Expand * expand = tipb_executor->mutable_expand(); + for (const auto & grouping_set : grouping_sets_columns) + { + auto * gss = expand->add_grouping_sets(); + for (const auto & grouping_exprs : grouping_set) + { + auto * ges = gss->add_grouping_exprs(); + for (const auto & grouping_col : grouping_exprs) + { + tipb::Expr * add_column = ges->add_grouping_expr(); + astToPB(children[0]->output_schema, grouping_col, add_column, collator_id, context); // ast column ref change to tipb:Expr column ref + } + } + } + auto * children_executor = expand->mutable_child(); + return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context); +} + +ExecutorBinderPtr compileExpand(ExecutorBinderPtr input, size_t & executor_index, MockVVecGroupingNameVec grouping_set_columns, std::set in_set) +{ + DAGSchema output_schema; + for (const auto & field : input->output_schema) + { + // if the column is in the grouping sets, make it nullable. + if (in_set.find(field.first) != in_set.end() && field.second.hasNotNullFlag()) + output_schema.push_back(toNullableDAGColumnInfo(field)); + else + output_schema.push_back(field); + } + { + tipb::FieldType field_type{}; + field_type.set_tp(TiDB::TypeLongLong); + field_type.set_charset("binary"); + field_type.set_collate(TiDB::ITiDBCollator::BINARY); + field_type.set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); // should have NOT NULL FLAG + field_type.set_flen(-1); + field_type.set_decimal(-1); + output_schema.push_back(std::make_pair("groupingID", TiDB::fieldTypeToColumnInfo(field_type))); + } + ExecutorBinderPtr expand = std::make_shared(executor_index, output_schema, std::move(grouping_set_columns)); + expand->children.push_back(input); + return expand; +} +} // namespace DB::mock diff --git a/dbms/src/Debug/MockExecutor/ExpandBinder.h b/dbms/src/Debug/MockExecutor/ExpandBinder.h new file mode 100644 index 00000000000..405b0b6e610 --- /dev/null +++ b/dbms/src/Debug/MockExecutor/ExpandBinder.h @@ -0,0 +1,43 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::mock +{ +using MockGroupingNameVec = std::vector; +using MockVecGroupingNameVec = std::vector; +using MockVVecGroupingNameVec = std::vector; + +class ExpandBinder : public ExecutorBinder +{ +public: + ExpandBinder(size_t & index_, const DAGSchema & output_schema_, MockVVecGroupingNameVec gss) + : ExecutorBinder(index_, "expand_" + std::to_string(index_), output_schema_) + , grouping_sets_columns(gss) + {} + + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + + void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } + +private: + // for now, every grouping set is base columns list, modify structure to be one more nested if grouping set merge is enabled. + MockVVecGroupingNameVec grouping_sets_columns; +}; + +ExecutorBinderPtr compileExpand(ExecutorBinderPtr input, size_t & executor_index, MockVVecGroupingNameVec grouping_set_columns, std::set set); +} // namespace DB::mock diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index ec0728bbf58..197d493fe4a 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -804,6 +805,65 @@ NamesAndTypes DAGExpressionAnalyzer::buildOrderColumns( return order_columns; } +GroupingSets DAGExpressionAnalyzer::buildExpandGroupingColumns( + const tipb::Expand & expand, + const ExpressionActionsPtr & actions) +{ + GroupingSets group_sets_columns; + std::map map_grouping_col; + group_sets_columns.reserve(expand.grouping_sets().size()); + for (const auto & group_set : expand.grouping_sets()) + { + GroupingSet group_set_columns; + group_set_columns.reserve(group_set.grouping_exprs().size()); + for (const auto & group_exprs : group_set.grouping_exprs()) + { + GroupingColumnNames group_exprs_columns; + group_exprs_columns.reserve(group_exprs.grouping_expr().size()); + for (const auto & group_expr : group_exprs.grouping_expr()) + { + String cp_name = getActions(group_expr, actions); + // tidb expression computation is based on column index offset child's chunk schema, change to ck block column name here. + group_exprs_columns.emplace_back(cp_name); + map_grouping_col.insert(std::pair(cp_name, true)); + } + // move here, cause basic string is copied from input cols. + group_set_columns.emplace_back(std::move(group_exprs_columns)); + } + group_sets_columns.emplace_back(std::move(group_set_columns)); + } + // change the original source column to be nullable, and add a new column for groupingID. + for (auto & mutable_one : source_columns) + { + if (map_grouping_col[mutable_one.name]) + mutable_one.type = makeNullable(mutable_one.type); + } + source_columns.emplace_back(Expand::grouping_identifier_column_name, Expand::grouping_identifier_column_type); + return group_sets_columns; +} + +ExpressionActionsPtr DAGExpressionAnalyzer::appendExpand( + const tipb::Expand & expand, + ExpressionActionsChain & chain) +{ + auto & last_step = initAndGetLastStep(chain); + for (const auto & origin_col : last_step.actions->getSampleBlock().getNamesAndTypesList()) + { + last_step.required_output.push_back(origin_col.name); + } + auto grouping_sets = buildExpandGroupingColumns(expand, last_step.actions); + last_step.actions->add(ExpressionAction::expandSource(grouping_sets)); + + auto before_expand = chain.getLastActions(); + chain.finalize(); + chain.clear(); + + auto & after_expand_step = initAndGetLastStep(chain); + for (const auto & column : getCurrentInputColumns()) + after_expand_step.required_output.push_back(column.name); + return before_expand; +} + std::vector DAGExpressionAnalyzer::appendOrderBy( ExpressionActionsChain & chain, const tipb::TopN & topN) diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 79b9880ae1a..4cec8ec0358 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -71,6 +71,10 @@ class DAGExpressionAnalyzer : private boost::noncopyable ExpressionActionsChain & chain, const std::vector & conditions); + GroupingSets buildExpandGroupingColumns(const tipb::Expand & expand, const ExpressionActionsPtr & actions); + + ExpressionActionsPtr appendExpand(const tipb::Expand & expand, ExpressionActionsChain & chain); + NamesAndTypes buildWindowOrderColumns(const tipb::Sort & window_sort) const; std::vector appendOrderBy( diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp index 206b59f38e1..a2a8f6b90f4 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp @@ -46,6 +46,7 @@ bool isSourceNode(const tipb::Executor * root) const static String SOURCE_NAME("source"); const static String SEL_NAME("selection"); const static String AGG_NAME("aggregation"); +const static String EXPAND_NAME("expand"); const static String WINDOW_NAME("window"); const static String WINDOW_SORT_NAME("window_sort"); const static String HAVING_NAME("having"); @@ -96,6 +97,12 @@ DAGQueryBlock::DAGQueryBlock(const tipb::Executor & root_, QueryBlockIDGenerator } current = ¤t->selection().child(); break; + case tipb::ExecType::TypeExpand: + GET_METRIC(tiflash_coprocessor_executor_count, type_expand).Increment(); + assignOrThrowException(&expand, current, EXPAND_NAME); + expand_name = current->executor_id(); + current = ¤t->expand().child(); + break; case tipb::ExecType::TypeStreamAgg: RUNTIME_CHECK_MSG(current->aggregation().group_by_size() == 0, STREAM_AGG_ERROR); case tipb::ExecType::TypeAggregation: diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h index 297a679d4e9..91dc6c2f439 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h @@ -60,6 +60,8 @@ class DAGQueryBlock String having_name; const tipb::Executor * limit_or_topn = nullptr; String limit_or_topn_name; + const tipb::Executor * expand = nullptr; + String expand_name; const tipb::Executor * exchange_sender = nullptr; String exchange_sender_name; UInt32 id; diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 31ee24f585a..ddbd57e202d 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -73,7 +75,9 @@ struct AnalysisResult ExpressionActionsPtr before_where; ExpressionActionsPtr before_aggregation; ExpressionActionsPtr before_having; - ExpressionActionsPtr before_order_and_select; + ExpressionActionsPtr before_order; + ExpressionActionsPtr before_expand; + ExpressionActionsPtr before_select; ExpressionActionsPtr final_projection; String filter_column_name; @@ -131,6 +135,14 @@ AnalysisResult analyzeExpressions( if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::ExecType::TypeTopN) { res.order_columns = analyzer.appendOrderBy(chain, query_block.limit_or_topn->topn()); + res.before_order = chain.getLastActions(); + chain.addStep(); + } + + if (query_block.expand) + { + res.before_expand = analyzer.appendExpand(query_block.expand->expand(), chain); + chain.addStep(); } const auto & dag_context = *context.getDAGContext(); @@ -146,7 +158,7 @@ AnalysisResult analyzeExpressions( chain, query_block.qb_column_prefix); - res.before_order_and_select = chain.getLastActions(); + res.before_select = chain.getLastActions(); chain.finalize(); chain.clear(); @@ -570,12 +582,12 @@ void DAGQueryBlockInterpreter::handleWindowOrder(DAGPipeline & pipeline, const t } // To execute a query block, you have to: -// 1. generate the date stream and push it to pipeline. +// 1. generate the data stream and push it to pipeline. // 2. assign the analyzer // 3. construct a final projection, even if it's not necessary. just construct it. // Talking about projection, it has the following rules. // 1. if the query block does not contain agg, then the final project is the same as the source Executor -// 2. if the query block contains agg, then the final project is the same as agg Executor +// 2. if the query block contains agg/expand, then the final project is the same as agg/expand Executor // 3. if the cop task may contains more then 1 query block, and the current query block is not the root // query block, then the project should add an alias for each column that needs to be projected, something // like final_project.emplace_back(col.name, query_block.qb_column_prefix + col.name); @@ -664,9 +676,9 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) executeWhere(pipeline, res.before_having, res.having_column_name, "execute having"); recordProfileStreams(pipeline, query_block.having_name); } - if (res.before_order_and_select) + if (res.before_order) { - executeExpression(pipeline, res.before_order_and_select, log, "before order and select"); + executeExpression(pipeline, res.before_order, log, "before order"); } if (!res.order_columns.empty()) @@ -676,14 +688,30 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) recordProfileStreams(pipeline, query_block.limit_or_topn_name); } - // execute final project action - executeProject(pipeline, final_project, "final projection"); // execute limit if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::TypeLimit) { executeLimit(pipeline); recordProfileStreams(pipeline, query_block.limit_or_topn_name); } + + // execute the expand OP after all filter/limits and so on. + // since expand OP has some row replication work to do, place it after limit can reduce some unnecessary burden. + // and put it before the final projection, because we should recognize some base col as grouping set col before change their alias. + if (res.before_expand) + { + executeExpand(pipeline, res.before_expand); + recordProfileStreams(pipeline, query_block.expand_name); + } + + if (res.before_select) + { + executeExpression(pipeline, res.before_select, log, "before select"); + } + + // execute final project action + executeProject(pipeline, final_project, "final projection"); + restorePipelineConcurrency(pipeline); // execute exchange_sender @@ -724,6 +752,15 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) } } +void DAGQueryBlockInterpreter::executeExpand(DAGPipeline & pipeline, const ExpressionActionsPtr & expr) +{ + String expand_extra_info = fmt::format("expand: grouping set {}", expr->getActions().back().expand->getGroupingSetsDes()); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expr, log->identifier()); + stream->setExtraInfo(expand_extra_info); + }); +} + void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) { RUNTIME_ASSERT(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr, log, "exchange_sender only run in MPP"); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index d2657b5c67a..c3cd27beacf 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -69,6 +69,7 @@ class DAGQueryBlockInterpreter void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle); void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns); void executeLimit(DAGPipeline & pipeline); + void executeExpand(DAGPipeline & pipeline, const ExpressionActionsPtr & expr); void executeWindow( DAGPipeline & pipeline, WindowDescription & window_description, @@ -91,7 +92,10 @@ class DAGQueryBlockInterpreter void restorePipelineConcurrency(DAGPipeline & pipeline); - DAGContext & dagContext() const { return *context.getDAGContext(); } + DAGContext & dagContext() const + { + return *context.getDAGContext(); + } Context & context; std::vector input_streams_vec; diff --git a/dbms/src/Flash/Coprocessor/collectOutputFieldTypes.cpp b/dbms/src/Flash/Coprocessor/collectOutputFieldTypes.cpp index 86a5edc7406..8813c36f24e 100644 --- a/dbms/src/Flash/Coprocessor/collectOutputFieldTypes.cpp +++ b/dbms/src/Flash/Coprocessor/collectOutputFieldTypes.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -99,6 +100,44 @@ bool collectForTableScan(std::vector & output_field_types, cons return false; } +bool collectForExpand(std::vector & out_field_types, const tipb::Executor & executor) +{ + auto & out_child_fields = out_field_types; + // collect output_field_types of children + getChildren(executor).forEach([&out_child_fields](const tipb::Executor & child) { + traverseExecutorTree(child, [&out_child_fields](const tipb::Executor & e) { return collectForExecutor(out_child_fields, e); }); + }); + + // make the columns from grouping sets nullable. + for (const auto & grouping_set : executor.expand().grouping_sets()) + { + for (const auto & grouping_exprs : grouping_set.grouping_exprs()) + { + for (const auto & grouping_col : grouping_exprs.grouping_expr()) + { + // assert that: grouping_col must be the column ref guaranteed by tidb. + auto column_index = decodeDAGInt64(grouping_col.val()); + RUNTIME_CHECK_MSG(column_index >= 0 || column_index < static_cast(out_child_fields.size()), "Column index out of bound"); + out_child_fields[column_index].set_flag(out_child_fields[column_index].flag() & (~TiDB::ColumnFlagNotNull)); + } + } + } + + { + // for additional groupingID column. + tipb::FieldType field_type{}; + field_type.set_tp(TiDB::TypeLongLong); + field_type.set_charset("binary"); + field_type.set_collate(TiDB::ITiDBCollator::BINARY); + // groupingID column should be Uint64 and NOT NULL. + field_type.set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); + field_type.set_flen(-1); + field_type.set_decimal(-1); + out_field_types.push_back(field_type); + } + return false; +} + bool collectForJoin(std::vector & output_field_types, const tipb::Executor & executor) { // collect output_field_types of children @@ -190,6 +229,8 @@ bool collectForExecutor(std::vector & output_field_types, const return collectForTableScan(output_field_types, executor.partition_table_scan()); case tipb::ExecType::TypeJoin: return collectForJoin(output_field_types, executor); + case tipb::ExecType::TypeExpand: + return collectForExpand(output_field_types, executor); default: return true; } diff --git a/dbms/src/Flash/Pipeline/Pipeline.cpp b/dbms/src/Flash/Pipeline/Pipeline.cpp index 67ba2569ae6..0421c1cd4c0 100644 --- a/dbms/src/Flash/Pipeline/Pipeline.cpp +++ b/dbms/src/Flash/Pipeline/Pipeline.cpp @@ -144,6 +144,7 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request) case tipb::ExecType::TypeTableScan: case tipb::ExecType::TypeExchangeSender: case tipb::ExecType::TypeExchangeReceiver: + case tipb::ExecType::TypeExpand: return true; default: is_supported = false; diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 8aabeef3885..059a681b867 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -197,6 +198,12 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec pushBack(PhysicalJoin::build(context, executor_id, log, executor->join(), FineGrainedShuffle(executor), left, right)); break; } + case tipb::ExecType::TypeExpand: + { + GET_METRIC(tiflash_coprocessor_executor_count, type_expand).Increment(); + pushBack(PhysicalExpand::build(context, executor_id, log, executor->expand(), popBack())); + break; + } default: throw TiFlashException(fmt::format("{} executor is not supported", executor->tp()), Errors::Planner::Unimplemented); } diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index 8f347716b2d..c3c31ce3c81 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -37,6 +37,7 @@ struct PlanType MockTableScan = 12, Join = 13, GetResult = 14, + Expand = 15, }; PlanTypeEnum enum_value; diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp b/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp new file mode 100644 index 00000000000..a710ac0fc40 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalExpand.cpp @@ -0,0 +1,114 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalExpand::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Expand & expand, + const PhysicalPlanNodePtr & child) +{ + assert(child); + + if (unlikely(expand.grouping_sets().empty())) + { + //should not reach here + throw TiFlashException("Expand executor without grouping sets", Errors::Planner::BadRequest); + } + + DAGExpressionAnalyzer analyzer{child->getSchema(), context}; + ExpressionActionsPtr before_expand_actions = PhysicalPlanHelper::newActions(child->getSampleBlock()); + + auto grouping_sets = analyzer.buildExpandGroupingColumns(expand, before_expand_actions); + auto expand_action = ExpressionAction::expandSource(grouping_sets); + // include expand action itself. + before_expand_actions->add(expand_action); + + NamesAndTypes expand_output_columns; + auto child_header = child->getSchema(); + for (const auto & one : child_header) + { + expand_output_columns.emplace_back(one.name, expand_action.expand->isInGroupSetColumn(one.name) ? makeNullable(one.type) : one.type); + } + expand_output_columns.emplace_back(Expand::grouping_identifier_column_name, Expand::grouping_identifier_column_type); + + auto physical_expand = std::make_shared( + executor_id, + expand_output_columns, + log->identifier(), + child, + expand_action.expand, + before_expand_actions); + + return physical_expand; +} + + +void PhysicalExpand::expandTransform(DAGPipeline & child_pipeline) +{ + String expand_extra_info = fmt::format("expand, expand_executor_id = {}: grouping set {}", execId(), shared_expand->getGroupingSetsDes()); + child_pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, expand_actions, log->identifier()); + stream->setExtraInfo(expand_extra_info); + }); +} + +void PhysicalExpand::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context &, size_t) +{ + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(group_builder.exec_status, expand_actions, log->identifier())); + }); +} + +void PhysicalExpand::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->buildBlockInputStream(pipeline, context, max_streams); + expandTransform(pipeline); +} + +void PhysicalExpand::finalize(const Names & parent_require) +{ + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); + Names required_output = parent_require; + required_output.emplace_back(Expand::grouping_identifier_column_name); + expand_actions->finalize(required_output); + + child->finalize(expand_actions->getRequiredColumns()); + FinalizeHelper::prependProjectInputIfNeed(expand_actions, child->getSampleBlock().columns()); + FinalizeHelper::checkSampleBlockContainsParentRequire(getSampleBlock(), parent_require); +} + +const Block & PhysicalExpand::getSampleBlock() const +{ + return expand_actions->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/Plans/PhysicalExpand.h b/dbms/src/Flash/Planner/Plans/PhysicalExpand.h new file mode 100644 index 00000000000..fa668b64114 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalExpand.h @@ -0,0 +1,59 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class PhysicalExpand : public PhysicalUnary +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Expand & expand, + const PhysicalPlanNodePtr & child); + + PhysicalExpand( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & child_, + const std::shared_ptr & shared_expand, + const ExpressionActionsPtr & expand_actions) + : PhysicalUnary(executor_id_, PlanType::Expand, schema_, req_id, child_) + , shared_expand(shared_expand) + , expand_actions(expand_actions) + {} + + void finalize(const Names & parent_require) override; + + void expandTransform(DAGPipeline & child_pipeline); + + const Block & getSampleBlock() const override; + + void buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) override; + +private: + void buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + std::shared_ptr shared_expand; + ExpressionActionsPtr expand_actions; +}; +} // namespace DB diff --git a/dbms/src/Flash/Statistics/CommonExecutorImpl.h b/dbms/src/Flash/Statistics/CommonExecutorImpl.h index 404fd1acbd6..42afeab9971 100644 --- a/dbms/src/Flash/Statistics/CommonExecutorImpl.h +++ b/dbms/src/Flash/Statistics/CommonExecutorImpl.h @@ -58,6 +58,19 @@ struct SortImpl }; using SortStatistics = ExecutorStatistics; +struct ExpandImpl +{ + static constexpr bool has_extra_info = false; + + static constexpr auto type = "Expand"; + + static bool isMatch(const tipb::Executor * executor) + { + return executor->has_expand(); + } +}; +using ExpandStatistics = ExecutorStatistics; + struct FilterImpl { static constexpr bool has_extra_info = false; diff --git a/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp b/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp index 44a72e11381..321599d9050 100644 --- a/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp +++ b/dbms/src/Flash/Statistics/ExecutorStatisticsCollector.cpp @@ -64,7 +64,8 @@ void ExecutorStatisticsCollector::initialize(DAGContext * dag_context_) SortStatistics, TableScanStatistics, TopNStatistics, - WindowStatistics>(executor_id, &executor)) + WindowStatistics, + ExpandStatistics>(executor_id, &executor)) { throw TiFlashException( fmt::format("Unknown executor type, executor_id: {}", executor_id), diff --git a/dbms/src/Flash/Statistics/traverseExecutors.cpp b/dbms/src/Flash/Statistics/traverseExecutors.cpp index dd720920dcd..94abeef3b01 100644 --- a/dbms/src/Flash/Statistics/traverseExecutors.cpp +++ b/dbms/src/Flash/Statistics/traverseExecutors.cpp @@ -41,6 +41,8 @@ Children getChildren(const tipb::Executor & executor) return Children{&executor.topn().child()}; case tipb::ExecType::TypeLimit: return Children{&executor.limit().child()}; + case tipb::ExecType::TypeExpand: + return Children{&executor.expand().child()}; case tipb::ExecType::TypeProjection: return Children{&executor.projection().child()}; case tipb::ExecType::TypeExchangeSender: diff --git a/dbms/src/Flash/tests/gtest_expand_executor.cpp b/dbms/src/Flash/tests/gtest_expand_executor.cpp new file mode 100644 index 00000000000..451a9430e98 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_expand_executor.cpp @@ -0,0 +1,413 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +namespace tests +{ +class ExpandExecutorTestRunner : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + context.addMockTable({"test_db", "test_table"}, + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); + context.addExchangeReceiver("exchange1", + {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, + {toNullableVec("s1", {"banana", {}, "banana"}), + toNullableVec("s2", {"apple", {}, "banana"})}); + } +}; + +TEST_F(ExpandExecutorTestRunner, ExpandLogical) +try +{ + /// case 1 + auto request = context + .scan("test_db", "test_table") + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"s1"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .build(context); + /// data flow: + /// + /// s1 s2 + /// "banana" "apple" + /// NULL NULL + /// "banana" "banana" + /// | + /// v + /// s1 s2 groupingID + /// "banana" NULL 1 + /// NULL "apple" 2 + /// NULL NULL 1 + /// NULL NULL 2 + /// "banana" NULL 1 + /// NULL "banana" 2 + /// + executeAndAssertColumnsEqual( + request, + {toNullableVec({"banana", {}, {}, {}, "banana", {}}), + toNullableVec({{}, "apple", {}, {}, {}, "banana"}), + toVec({1, 2, 1, 2, 1, 2})}); + + /// case 2 + request = context + .scan("test_db", "test_table") + .filter(eq(col("s1"), col("s2"))) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"s1"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .build(context); + /// data flow: + /// + /// s1 s2 + /// "banana" "apple" + /// NULL NULL + /// "banana" "banana" + /// | + /// v + /// s1 s2 + /// "banana" "banana" + /// | + /// v + /// s1 s2 groupingID + /// "banana" NULL 1 + /// NULL "banana" 2 + /// + executeAndAssertColumnsEqual( + request, + {toNullableVec({"banana", {}}), + toNullableVec({{}, "banana"}), + toVec({1, 2})}); + + /// case 3: this case is only for non-planner mode. + /// request = context + /// .scan("test_db", "test_table") + /// .expand(MockVVecColumnNameVec{MockVecColumnNameVec{MockColumnNameVec{"s1"},}, MockVecColumnNameVec{MockColumnNameVec{"s2"},},}) + /// .filter(eq(col("s1"), col("s2"))) + /// .build(context); + /// data flow: TiFlash isn't aware of the operation sequence, this filter here will be run before expand does just like the second test case above. + /// since this case is only succeed under planner-disabled mode, just comment and assert the result here for a note. + /// + /// executeAndAssertColumnsEqual( + /// request, + /// {toNullableVec({"banana", {}}), + /// toNullableVec({{}, "banana"}), + /// toVec({1,2})}); + + /// case 4 + auto const_false = lit(Field(static_cast(0))); + request = context + .scan("test_db", "test_table") + .filter(const_false) // refuse all rows + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"s1"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .build(context); + executeAndAssertColumnsEqual( + request, + {}); + + /// case 5 (test integrated with aggregation) + request = context + .scan("test_db", "test_table") + .aggregation({Count(col("s1"))}, {col("s2")}) + .build(context); + executeAndAssertColumnsEqual( + request, + { + toVec({1, 0, 1}), + toNullableVec({"apple", {}, "banana"}), + }); + + request = context + .scan("test_db", "test_table") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .build(context); + /// data flow: + /// + /// s1 s2 + /// "banana" "apple" + /// NULL NULL + /// "banana" "banana" + /// | + /// v + /// count(s1) s2 + /// 1 "apple" + /// 0 NULL + /// 1 "banana" + /// | + /// v + /// count(s1) s2 groupingID + /// 1 NULL 1 + /// NULL "apple" 2 + /// 0 NULL 1 + /// NULL NULL 2 + /// 1 NULL 1 + /// NULL "banana" 2 + /// + executeAndAssertColumnsEqual( + request, + {toNullableVec({1, {}, 0, {}, 1, {}}), + toNullableVec({{}, "apple", {}, {}, {}, "banana"}), + toVec({1, 2, 1, 2, 1, 2})}); + + /// case 5 (test integrated with aggregation and projection) + request = context + .scan("test_db", "test_table") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .project({"count(s1)"}) + .build(context); + executeAndAssertColumnsEqual( + request, + {toNullableVec({1, {}, 0, {}, 1, {}})}); + + /// case 6 (test integrated with aggregation and projection and limit) 1 + /// note: by now, limit is executed before expand does to reduce unnecessary row expand work. + /// request = context + /// .scan("test_db", "test_table") + /// .aggregation({Count(col("s1"))}, {col("s2")}) + /// .expand(MockVVecColumnNameVec{MockVecColumnNameVec{MockColumnNameVec{"count(s1)"},}, MockVecColumnNameVec{MockColumnNameVec{"s2"},},}) + /// .limit(2) + /// .project({"count(s1)"}) + /// .build(context); + /// data flow: + /// + /// s1 s2 + /// "banana" "apple" + /// NULL NULL + /// "banana" "banana" + /// | + /// v + /// count(s1) s2 + /// 1 "apple" + /// 0 NULL + /// 1 "banana" + /// | + /// v + /// count(s1) s2 // limit precede the expand OP since they are in the same DAG query block. + /// 1 "apple" + /// 0 NULL + /// | + /// v + /// count(s1) s2 groupingID // expand is always arranged executed after limit to avoid unnecessary replication in the same DAG query block. + /// 1 NULL 1 + /// NULL "apple" 2 + /// 0 NULL 1 + /// NULL NULL 2 + /// 1 NULL 1 + /// NULL "banana" 2 + /// | + /// v + /// count(s1) + /// 1 + /// NULL + /// 0 + /// NULL + /// + /// since this case is only succeed under planner-disabled mode, just comment and assert the result here for a note. + /// + /// executeAndAssertColumnsEqual( + /// request, + /// {toNullableVec({1, {}, 0, {}})}); + + /// case 7 (test integrated with aggregation and projection and limit) 2 + request = context + .scan("test_db", "test_table") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .project({"count(s1)"}) + .topN({{"count(s1)", true}}, 2) + .build(context); + /// data flow: + /// + /// s1 s2 ---------------+ + /// "banana" "apple" | + /// NULL NULL // table scan | + /// "banana" "banana" | + /// | | + /// v | + /// count(s1) s2 | + /// 1 "apple" // aggregate | + /// 0 NULL | + /// 1 "banana" | + /// | +-------------> Child DAG Query Block + /// v | + /// count(s1) s2 groupingID // expand | + /// 1 NULL 1 | + /// NULL "apple" 2 | + /// 0 NULL 1 | + /// NULL NULL 2 | + /// 1 NULL 1 | + /// NULL "banana" 2 | + /// | --------------+ + /// v --------------+ + /// count(s1) | + /// 1 | + /// NULL // projection | + /// 0 | + /// NULL | + /// 1 +-------------> parent DAG Query Block + /// NULL | + /// | | + /// v | + /// count(s1) // sort (desc) | + /// 1 | + /// 1 | + /// 0 | + /// NULL | + /// NULL | + /// NULL | + /// | | + /// v | + /// count(s1) // limit 2 | + /// 1 | + /// 1 | + /// ---------------+ + /// + /// Note: you can see some difference from this plan and the last one above, since projection between expand and topN is a SOURCE node, + /// it will isolate whole DAG into two independent DAG query blocks, limit and expand OP take a place in each one of them. So we + /// couldn't guarantee that letting expand OP run after limit does, which can't reduce unnecessary replication work. DAG query block + /// division should be blamed here. + /// + executeAndAssertColumnsEqual( + request, + {toNullableVec({1, 1})}); + + /// case 8 (test integrated with receiver and join) + request = context + .receive("exchange1") + .join(context.scan("test_db", "test_table").project({"s2"}), tipb::JoinType::TypeInnerJoin, {col("s2")}) + .build(context); + executeAndAssertColumnsEqual( + request, + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"apple", "banana"})}); + + request = context + .receive("exchange1") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .join(context.scan("test_db", "test_table").project({"s2"}), tipb::JoinType::TypeInnerJoin, {col("s2")}) + .project({"count(s1)", "groupingID"}) + .topN({{"groupingID", true}}, 2) + .build(context); + /// data flow: + /// + /// s1 s2 ---------------+ + /// "banana" "apple" | + /// NULL NULL // table scan | + /// "banana" "banana" | + /// | | + /// v | + /// count(s1) s2 | + /// 1 "apple" // aggregate | + /// 0 NULL | + /// 1 "banana" | + /// | +-------------> Child of Child DAG Query Block + /// v | + /// count(s1) s2 groupingID // expand | + /// 1 NULL 1 | + /// NULL "apple" 2 | + /// 0 NULL 1 | + /// NULL NULL 2 | + /// 1 NULL 1 | + /// NULL "banana" 2 | + /// | --------------+ + /// v --------------+ + /// count(s1) s2 groupingID * s2 | + /// NULL "apple" 2 "apple" // join | + /// NULL "banana" 2 NULL | + /// "banana" +-------------> Child DAG Query Block + /// | + /// NULL "apple" 2 "apple" | + /// NULL "banana" 2 "banana" | + /// | ---------------+ + /// v | + /// count(s1) groupingID // projection | + /// NULL 2 | + /// NULL 2 | + /// | +-------------> Parent DAG Query Block + /// v | + /// count(s1) groupingID // topN | + /// NULL 2 | + /// NULL 2 | + /// ---------------+ + /// + executeAndAssertColumnsEqual( + request, + { + toNullableVec({{}, {}}), + toVec({2, 2}), + }); +} +CATCH + +/// TODO: more OP combination tests. + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/tests/gtest_filter_executor.cpp b/dbms/src/Flash/tests/gtest_filter_executor.cpp index 3a3b5b16b2c..72cc171d1c7 100644 --- a/dbms/src/Flash/tests/gtest_filter_executor.cpp +++ b/dbms/src/Flash/tests/gtest_filter_executor.cpp @@ -211,7 +211,7 @@ try } CATCH -TEST_F(FilterExecutorTestRunner, convertBool) +TEST_F(FilterExecutorTestRunner, convert_bool) try { { diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index e129c5587a5..634d175a9b0 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -394,5 +394,30 @@ try } } CATCH + +TEST_F(InterpreterExecuteTest, ExpandPlan) +try +{ + { + auto request = context + .receive("sender_1") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .join(context.scan("test_db", "test_table").project({"s2"}), tipb::JoinType::TypeInnerJoin, {col("s2")}) + .project({"count(s1)", "groupingID"}) + .topN({{"groupingID", true}}, 2) + .build(context); + runAndAssert(request, 10); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_interpreter.out b/dbms/src/Flash/tests/gtest_interpreter.out index 89e323ef5e8..58e48ec0db4 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.out +++ b/dbms/src/Flash/tests/gtest_interpreter.out @@ -7,7 +7,7 @@ Union: MergeSorting, limit = 10 Union: PartialSorting x 10: limit = 10 - Expression: + Expression: Filter: SharedQuery: ParallelAggregating, max_threads: 10, final: true @@ -20,11 +20,11 @@ Union: ~result: Union: SharedQuery x 10: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Expression: - Expression: + Expression: + Expression: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 Filter: SharedQuery: ParallelAggregating, max_threads: 10, final: true @@ -84,13 +84,13 @@ Union: ~result: Union: SharedQuery x 10: - Limit, limit = 10 - Union: - Limit x 10, limit = 10 - Expression: + Expression: + Limit, limit = 10 + Union: + Limit x 10, limit = 10 Expression: Expression: - Expression: + Expression: Filter: Expression: Expression: @@ -138,7 +138,7 @@ Union: ~result: Union: Expression x 10: - Expression: + Expression: SharedQuery: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} Expression: @@ -153,7 +153,7 @@ Union: ~result: Union: Expression x 10: - Expression: + Expression: Expression: Expression: SharedQuery: @@ -170,7 +170,7 @@ Union: ~result: Union: Expression x 10: - Expression: + Expression: Expression: Expression: SharedQuery: @@ -191,7 +191,7 @@ Union: ~result: Union: Expression x 8: - Expression: + Expression: Window: , function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} Expression: MergeSorting: , limit = 0 @@ -215,7 +215,7 @@ Union: ~result: Union: Expression x 10: - Expression: + Expression: SharedQuery: Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} Expression: @@ -423,11 +423,11 @@ CreatingSets Union: MockExchangeSender x 20 SharedQuery: - Limit, limit = 10 - Union: - Limit x 20, limit = 10 - Expression: - Expression: + Expression: + Expression: + Limit, limit = 10 + Union: + Limit x 20, limit = 10 SharedQuery: ParallelAggregating, max_threads: 20, final: true Expression x 20: @@ -439,8 +439,8 @@ CreatingSets ~test_suite_name: ListBase ~result_index: 0 ~result: -Limit, limit = 10 - Expression: +Expression: + Limit, limit = 10 Aggregating Expression: Filter: @@ -461,3 +461,30 @@ Union: Filter: MockTableScan @ +~test_suite_name: ExpandPlan +~result_index: 0 +~result: +CreatingSets + Union: + HashJoinBuild x 10: , join_kind = Inner + Expression: + Expression: + Expression: + Expression: + MockTableScan + Union: + SharedQuery x 10: + Expression: + MergeSorting, limit = 2 + Union: + PartialSorting x 10: limit = 2 + Expression: + Expression: + Expression: + HashJoinProbe: + Expression: + Expression: }{}]> + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + MockExchangeReceiver x 10 +@ diff --git a/dbms/src/Flash/tests/gtest_pipeline_interpreter.cpp b/dbms/src/Flash/tests/gtest_pipeline_interpreter.cpp index 1a5478cbdf1..828e14e36ff 100644 --- a/dbms/src/Flash/tests/gtest_pipeline_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_pipeline_interpreter.cpp @@ -521,5 +521,29 @@ try } CATCH +TEST_F(PipelineInterpreterExecuteTest, ExpandPlan) +try +{ + { + auto request = context + .receive("sender_1") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .join(context.scan("test_db", "test_table").project({"s2"}), tipb::JoinType::TypeInnerJoin, {col("s2")}) + .project({"count(s1)", "groupingID"}) + .topN({{"groupingID", true}}, 2) + .build(context); + runAndAssert(request, 10); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out index 11a0cddabda..a59672f7809 100644 --- a/dbms/src/Flash/tests/gtest_pipeline_interpreter.out +++ b/dbms/src/Flash/tests/gtest_pipeline_interpreter.out @@ -604,3 +604,30 @@ Union: Filter MockTableScan @ +~test_suite_name: ExpandPlan +~result_index: 0 +~result: +CreatingSets + Union: + HashJoinBuild x 10: , join_kind = Inner + Expression: + Expression: + Expression: + MockTableScan + Union: + Expression x 10: + SharedQuery: + MergeSorting, limit = 2 + Union: + PartialSorting x 10: limit = 2 + Expression: + Expression: + HashJoinProbe: + Expression: + Expression: }{}]> + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockExchangeReceiver +@ diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index f4fd9be7613..5b9bea7b28f 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -499,5 +499,30 @@ try } CATCH + +TEST_F(PlannerInterpreterExecuteTest, ExpandPlan) +try +{ + { + auto request = context + .receive("sender_1") + .aggregation({Count(col("s1"))}, {col("s2")}) + .expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"count(s1)"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .join(context.scan("test_db", "test_table").project({"s2"}), tipb::JoinType::TypeInnerJoin, {col("s2")}) + .project({"count(s1)", "groupingID"}) + .topN({{"groupingID", true}}, 2) + .build(context); + runAndAssert(request, 10); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.out b/dbms/src/Flash/tests/gtest_planner_interpreter.out index 8c46e98de28..549cdc35503 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.out +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.out @@ -709,3 +709,30 @@ Union: Filter MockTableScan @ +~test_suite_name: ExpandPlan +~result_index: 0 +~result: +CreatingSets + Union: + HashJoinBuild x 10: , join_kind = Inner + Expression: + Expression: + Expression: + MockTableScan + Union: + Expression x 10: + SharedQuery: + MergeSorting, limit = 2 + Union: + PartialSorting x 10: limit = 2 + Expression: + Expression: + HashJoinProbe: + Expression: + Expression: }{}]> + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + MockExchangeReceiver +@ diff --git a/dbms/src/Interpreters/Expand.cpp b/dbms/src/Interpreters/Expand.cpp new file mode 100644 index 00000000000..3910efec76a --- /dev/null +++ b/dbms/src/Interpreters/Expand.cpp @@ -0,0 +1,225 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ +void convertColumnToNullable(ColumnWithTypeAndName & column) +{ + column.type = makeNullable(column.type); + if (column.column) + column.column = makeNullable(column.column); +} +} // namespace + +Expand::Expand(const DB::GroupingSets & gss) + : group_sets_names(gss) +{ + collectNameSet(); +} + +String Expand::getGroupingSetsDes() const +{ + FmtBuffer buffer; + buffer.append("["); + for (const auto & grouping_set : group_sets_names) + { + buffer.append("{"); + for (const auto & grouping_exprs : grouping_set) + { + buffer.append("<"); + buffer.joinStr(grouping_exprs.begin(), grouping_exprs.end()); + buffer.append(">"); + } + buffer.append("}"); + } + buffer.append("]"); + return buffer.toString(); +} + +/// for cases like: select count(distinct a), count(distinct b) from t; +/// it will generate 2 group set with and , over which we should +/// expand one more replica of the source rows from the input block and +/// identify it with the grouping id in the appended new column. +/// +/// eg: source block ==> replicated block +/// ==> a new column is appended +/// 1 1 target a -+-----> 1 null groupingID for a =1 +/// 2 2 +-----> 2 null groupingID for b =2 +/// target b -+-----> null 1 groupingID for a =1 +/// +-----> null 2 groupingID for b =2 +/// +/// when target a specified group set, other group set columns should be filled +/// with null value to make group by(a,b) operator to meet the equivalence effect +/// of group by(a) and group by(b) since the other group set columns has been filled +/// with null value. +/// +/// \param input the source block +/// \return + +void Expand::replicateAndFillNull(Block & block) const +{ + size_t origin_rows = block.rows(); + // make a replicate slice, using it to replicate origin rows. + std::unique_ptr offsets_to_replicate; + offsets_to_replicate = std::make_unique(origin_rows); + + // get the replicate offset fixed as group set num. + IColumn::Offset current_offset = 0; + const IColumn::Offset replicate_times_for_one_row = getGroupSetNum(); + + // create a column for grouping id. + auto grouping_id_column = ColumnUInt64::create(); + auto & grouping_id_column_data = grouping_id_column->getData(); + // reserve N times of current block rows size. + grouping_id_column_data.resize(origin_rows * replicate_times_for_one_row); + + size_t grouping_id_column_index = 0; + for (size_t i = 0; i < origin_rows; ++i) + { + current_offset += replicate_times_for_one_row; + (*offsets_to_replicate)[i] = current_offset; + + // in the same loop, to fill the grouping id. + for (UInt64 j = 0; j < replicate_times_for_one_row; ++j) + { + // start from 1. + grouping_id_column_data[grouping_id_column_index++] = j + 1; + } + } + + // replicate the original block rows. + size_t existing_columns = block.columns(); + + if (offsets_to_replicate) + { + for (size_t i = 0; i < existing_columns; ++i) + { + // expand the origin const column, since it may be filled with null value when expanding. + if (block.safeGetByPosition(i).column->isColumnConst()) + block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->convertToFullColumnIfConst(); + + // for every existing column, if the column is a grouping set column, make it nullable. + auto & column = block.safeGetByPosition(i); + if (isInGroupSetColumn(column.name) && !column.column->isColumnNullable()) + convertColumnToNullable(block.getByPosition(i)); + + if (!offsets_to_replicate->empty()) + column.column = column.column->replicate(*offsets_to_replicate); + } + } + + + // after replication, it just copied the same row for N times, we still need to fill corresponding Field with null value. + for (size_t grouping_offset = 0; grouping_offset < replicate_times_for_one_row; ++grouping_offset) + { + auto grouping_columns = getGroupSetColumnNamesByOffset(grouping_offset); + // for every grouping col, get the mutated one of them. + for (const auto & grouping_col : grouping_columns) + { + assert(block.getByName(grouping_col).column->isColumnNullable()); + + const auto * nullable_column = typeid_cast(block.getByName(grouping_col).column.get()); + auto cloned = ColumnNullable::create(nullable_column->getNestedColumnPtr(), nullable_column->getNullMapColumnPtr()); + auto * cloned_one = typeid_cast(cloned->assumeMutable().get()); + + /// travel total rows, and set null values for current grouping set column. + /// basically looks like: + /// eg: source block ==> replicated block + /// ==> a new column is appended + /// 1 1 target a -+-----> 1 null groupingID for a =1 + /// 2 2 +-----> 2 null groupingID for b =2 + /// target b -+-----> null 1 groupingID for a =1 + /// +-----> null 2 groupingID for b =2 + /// + /// after the replicate is now, the data form likes like below + /// ==> for one : in + /// -----------------+ locate the target row in every single small group with the same "offset_of_grouping_col" in set + /// 1 1 1 + replicate_group1 for a, it's 0, we should pick and set: + /// 1 1 2 + replicate_group_rows[0].a = null + /// -----------------+ + /// 2 2 1 + replicate_group2 for b, it's 1, we should pick and set: + /// 2 2 2 + replicate_group_rows[1].b = null + /// -----------------+ + for (size_t i = 0; i < origin_rows; ++i) + { + // for every original one row mapped N rows, fill the corresponding group set column as null value according to the offset. + // only when the offset in replicate_group equals to current group_offset, set the data to null. + // eg: for case above, for grouping_offset of = 0, we only set the every offset = 0 in each + // small replicate_group_x to null. + for (UInt64 j = 0; j < replicate_times_for_one_row; ++j) + { + if (j == grouping_offset) + { + // only keep this column value for targeted replica. + continue; + } + // set this column as null for all the other targeted replica. + // todo: since nullable column always be prior to computation of null value first, should we clean the old data at the same pos in nested column + auto computed_offset = i * replicate_times_for_one_row + j; + cloned_one->getNullMapData().data()[computed_offset] = 1; + } + } + block.getByName(grouping_col).column = std::move(cloned); + } + // finish of adjustment for one grouping set columns. (by now one column for one grouping set). + } + block.insert(ColumnWithTypeAndName(std::move(grouping_id_column), grouping_identifier_column_type, grouping_identifier_column_name)); + // return input from block. +} + +bool Expand::isInGroupSetColumn(String name) const +{ + return name_set.find(name) != name_set.end(); +} + +const GroupingColumnNames & Expand::getGroupSetColumnNamesByOffset(size_t offset) const +{ + /// currently, there only can be one groupingExprs in one groupingSet before the planner supporting the grouping set merge. + return group_sets_names[offset][0]; +} + +const std::set & Expand::getAllGroupSetColumnNames() const +{ + return name_set; +} + +void Expand::collectNameSet() +{ + for (const auto & it1 : group_sets_names) + { + // for every grouping set. + for (const auto & it2 : it1) + { + // for every grouping exprs + for (const auto & it3 : it2) + { + name_set.insert(it3); + } + } + } +} + +const std::string Expand::grouping_identifier_column_name = "groupingID"; +const DataTypePtr Expand::grouping_identifier_column_type = std::make_shared(); +} // namespace DB diff --git a/dbms/src/Interpreters/Expand.h b/dbms/src/Interpreters/Expand.h new file mode 100644 index 00000000000..076abe7b83b --- /dev/null +++ b/dbms/src/Interpreters/Expand.h @@ -0,0 +1,131 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +/// groupingSets are formed as { groupingSet, groupingSet...} +/// groupingSet are formed as { groupingExprs, groupingExprs...} +/// groupingExprs are formed as slice of expression/column names +/// simply for now case like: select count(distinct a), count(distinct b) from t; +/// we got 2 groupings set like: {[], []} +/// +/// shortly soon, we can support the grouping sets merging, which could take case +/// like: select count(distinct a,b), count(distinct a), count(distinct c) from t as +/// we still got 2 grouping sets like: {[, ], []} +/// +/// the second case in which the group layout has been merged with the prefix +/// common group layout into unified one set to reduce the underlying data replication/expand cost. +/// +using GroupingColumnName = ::String; +using GroupingColumnNames = std::vector; +using GroupingSet = std::vector; +using GroupingSets = std::vector; + + +/** Data structure for implementation of Expand. + * + * Expand is a kind of operator used for replicate low-layer datasource rows to feed different aggregate + * grouping-layout requirement. (Basically known as grouping sets) + * + * For current scenario, it is applied to accelerate the computation of multi distinct aggregates by utilizing + * multi nodes computing resource in a way of scheming 3-phase aggregation under mpp mode. + * + * GroupingSets descriptions are all needed by Expand operator itself, the length of GroupingSets are the needed + * expand number (in other words, one grouping set require one replica of source rows). Since different grouping + * set column shouldn't let its targeted rows affected by other grouping set columns (which will also be appear in + * the group by items) when do grouping work, we should isolate different grouping set columns by filling them with + * null values when expanding rows. + * + * Here is an example: + * Say we got a query like this: select count(distinct a), count(distinct b) from t. + * + * Downward requirements formed by this query are consist of two different grouping set , , and both of this + * two columns will be in the group by items. Make record here as --- GROUP BY(a,b) + * + * Different group layouts are doomed to be unable to be feed with same replica of data in shuffling mode Except + * gathering them all to the single node. While the latter one is usually accompanied by a single point of bottleneck. + * + * That's why data expand happens here. Say we got two tuple as below: + * + * ==> after expand we got + * 1 1 origin row 1 1 + * 1 2 expand row 1 1 + * origin row 1 2 + * expand row 1 2 + * + * See what we got now above, although we have already expanded/doubled the origin rows, while when grouping them together + * with GROUP BY(a,b) clause (resulting 2 group (1,1),(1,2) here), we found that we still can not get the right answer for + * count distinct agg for a. + * + * From the theory, every origin/expanded row should be targeted for one group out requirement, which means row<1> and row<3> + * about should be used to feed count(distinct a), while since the value of b in row<3> is different from that from row<1>, + * that leads them being divided into different group. + * + * Come back to the origin goal to feed count(distinct a), in which we don't even care about what is was in column b from row<1> + * and row<3>, because current agg args is aimed at column a. Therefore, we filled every non-targeted grouping set column in + * expanded row as null value. After that we got as below: + * + * ==> after expand we got + * 1 1 origin row 1 null ---> target for grouping set a + * 1 2 expand row null 1 ---> target for grouping set b + * origin row 1 null ---> target for grouping set a + * expand row null 2 ---> target for grouping set b + * + * Then, when grouping them together with GROUP BY(a,b) clause, we got row<1> and row<3> together, and row<2>, row<4> as a + * self-group individually. Among them, every distinct agg has their self-targeted data grouped correctly. GROUP BY(a,b) clause + * is finally seen/taken as a equivalent group to GROUP BY(a, null) for a-targeted rows, GROUP BY(null, b) for b-targeted rows. + * + * Over the correct grouped data, the result computation for distinct agg is quite reasonable. By the way, if origin row has some + * column that isn't belong to any grouping set, just let it be copied as it was in expanded row. + * + */ +class Expand +{ +public: + explicit Expand(const GroupingSets & gss); + + // replicateAndFillNull is the basic functionality that Expand Operator provided. Briefly, it replicates + // origin rows with regard to local grouping sets description, and appending a new column named as groupingID + // to illustrate what group this row is targeted for. + void replicateAndFillNull(Block & block) const; + + bool isInGroupSetColumn(String name) const; + + const std::set & getAllGroupSetColumnNames() const; + + String getGroupingSetsDes() const; + + static const String grouping_identifier_column_name; + + static const DataTypePtr grouping_identifier_column_type; + +private: + void collectNameSet(); + + size_t getGroupSetNum() const { return group_sets_names.size(); } + + const GroupingColumnNames & getGroupSetColumnNamesByOffset(size_t offset) const; + + GroupingSets group_sets_names; + std::set name_set; +}; +} // namespace DB diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 46547a4f686..826f86d1059 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -48,6 +48,12 @@ Names ExpressionAction::getNeededColumns() const for (const auto & column : projections) res.push_back(column.first); + if (expand) + { + for (const auto & column : expand->getAllGroupSetColumnNames()) + res.push_back(column); + } + if (!source_name.empty()) res.push_back(source_name); @@ -135,6 +141,13 @@ ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr join return a; } +ExpressionAction ExpressionAction::expandSource(GroupingSets grouping_sets_) +{ + ExpressionAction a; + a.type = EXPAND; + a.expand = std::make_shared(grouping_sets_); + return a; +} void ExpressionAction::prepare(Block & sample_block) { @@ -228,6 +241,23 @@ void ExpressionAction::prepare(Block & sample_block) break; } + case EXPAND: + { + // sample_block is just for schema check followed by later block, modify it if your schema has changed during this action. + auto name_set = expand->getAllGroupSetColumnNames(); + // make grouping set column to be nullable. + for (const auto & col_name : name_set) + { + auto & column_with_name = sample_block.getByName(col_name); + column_with_name.type = makeNullable(column_with_name.type); + if (column_with_name.column != nullptr) + column_with_name.column = makeNullable(column_with_name.column); + } + // fill one more column: groupingID. + sample_block.insert({nullptr, expand->grouping_identifier_column_type, expand->grouping_identifier_column_name}); + break; + } + case PROJECT: { Block new_block; @@ -313,6 +343,12 @@ void ExpressionAction::execute(Block & block) const break; } + case EXPAND: + { + expand->replicateAndFillNull(block); + break; + } + case PROJECT: { Block new_block; diff --git a/dbms/src/Interpreters/ExpressionActions.h b/dbms/src/Interpreters/ExpressionActions.h index 68fb35f8048..83090f675ed 100644 --- a/dbms/src/Interpreters/ExpressionActions.h +++ b/dbms/src/Interpreters/ExpressionActions.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -34,6 +35,7 @@ using NameWithAlias = std::pair; using NamesWithAliases = std::vector; class Join; +class Expand; class IFunctionBase; using FunctionBasePtr = std::shared_ptr; @@ -65,6 +67,8 @@ struct ExpressionAction /// Reorder and rename the columns, delete the extra ones. The same column names are allowed in the result. PROJECT, + + EXPAND, }; Type type; @@ -90,6 +94,9 @@ struct ExpressionAction /// For PROJECT. NamesWithAliases projections; + /// For EXPAND. + std::shared_ptr expand; + /// If result_name_ == "", as name "function_name(arguments separated by commas) is used". static ExpressionAction applyFunction( const FunctionBuilderPtr & function_, @@ -103,6 +110,7 @@ struct ExpressionAction static ExpressionAction project(const NamesWithAliases & projected_columns_); static ExpressionAction project(const Names & projected_columns_); static ExpressionAction ordinaryJoin(std::shared_ptr join_, const NamesAndTypesList & columns_added_by_join_); + static ExpressionAction expandSource(GroupingSets grouping_sets); /// Which columns necessary to perform this action. Names getNeededColumns() const; diff --git a/dbms/src/Interpreters/tests/gtest_block_expand.cpp b/dbms/src/Interpreters/tests/gtest_block_expand.cpp new file mode 100644 index 00000000000..c5756d3e94a --- /dev/null +++ b/dbms/src/Interpreters/tests/gtest_block_expand.cpp @@ -0,0 +1,404 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ +namespace tests +{ + +class BlockExpand : public ::testing::Test +{ +public: + using ColStringType = typename TypeTraits::FieldType; + using ColInt64Type = typename TypeTraits::FieldType; + using ColUInt64Type = typename TypeTraits::FieldType; + using ColumnWithString = std::vector; + using ColumnWithInt64 = std::vector; + using ColumnWithUInt64 = std::vector; + + const String single_col_name{"single_col"}; + const ColumnWithString col0_ori{"col0-1 ", "col0-7", "col0-0 ", "col0-3", "col0-4", "col0-6", "col0-2 ", "col0-5"}; + const std::vector col_name{"age", "gender", "country", "region", "zip"}; +}; + +/// todo: for some column overlapping in different grouping set, we should copy the overlapped column as a new column +/// and the upper layer OP's computation should be shifted and based on the new one's id. Need a plan side control, +/// tiflash side is ready to go. +/// +/// just a overlapped case for grouping set 1: <{a}>, grouping set 2: <{a,b}> +/// count(distinct a) and count(distinct a, b) planner will clone a new column from a for either one of them +/// count(distinct a') and count(distinct a, b) = need a more column a' here (maybe from lower projection or something) +/// then, according a' and a,b 's index offset in PB, to describe the grouping set definition. +/// when targeting a' 's replicate group, fill a and b as null in the group. +/// when targeting a and b 's replicate group, fill a' as null in the group. +TEST_F(BlockExpand, ExpandLogic4Overlap) +try +{ + { + // test basic block expand operation. (two grouping set: {}, {}) + const ColumnsWithTypeAndName + ori_col + = { + toVec(col_name[0], ColumnWithInt64{1, 0, -1}), + toVec(col_name[1], ColumnWithString{"1 ", "1 ", "1 "}), + toVec(col_name[2], ColumnWithString{"1", "2", "3"}), + toVec(col_name[3], ColumnWithUInt64{1, 1, 0}), + }; + // group set, group set + GroupingSet g_age = GroupingSet{GroupingColumnNames{col_name[0]}}; + GroupingSet g_gender_country = GroupingSet{GroupingColumnNames{col_name[1], col_name[2]}}; + GroupingSets group_sets = GroupingSets{g_age, g_gender_country}; + Expand expand = Expand(group_sets); + Block block(ori_col); + auto origin_rows = block.rows(); + + expand.replicateAndFillNull(block); + // assert the col size is added with 1. + ASSERT_EQ(block.getColumns().size(), size_t(5)); + // assert the new col groupingID is appended. + ASSERT_EQ(block.getColumnsWithTypeAndName()[4].name, "groupingID"); + // assert the block size is equal to origin rows * grouping set num. + auto expand_rows = block.rows(); + auto grouping_set_num = 2; + ASSERT_EQ(origin_rows * grouping_set_num, expand_rows); // 6 + // assert grouping set column are nullable. + ASSERT_EQ(block.getColumns()[0].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[1].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[2].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[3].get()->isColumnNullable(), false); + ASSERT_EQ(block.getColumns()[4].get()->isColumnNullable(), false); + + // assert the rows layout + // "age", "gender", "country", "region", "groupingID" + // ori_col 1 null null 1 1 + // rpt_col null "1 " "1" 1 2 + // + // ori_col 0 null null 1 1 + // rpt_col null "1 " "2" 1 2 + // + // ori_col -1 null null 0 1 + // rpt_col null "1 " "3" 0 2 + const auto num4_null = 100; + const auto res0 = ColumnWithInt64{1, num4_null, 0, num4_null, -1, num4_null}; + const auto * col_0 = typeid_cast(block.getColumns()[0].get()); + const auto * col_0_nest = &static_cast(col_0->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res0[i] == num4_null) + { + ASSERT_EQ(col_0->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_0_nest->getElement(i), res0[i]); + } + } + + const auto res1 = ColumnWithString{"null", "1 ", "null", "1 ", "null", "1 "}; + const auto * col_1 = typeid_cast(block.getColumns()[1].get()); + const auto * col_1_nest = &static_cast(col_1->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res1[i] == "null") + { + ASSERT_EQ(col_1->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_1_nest->getDataAt(i), res1[i]); + } + } + + const auto res2 = ColumnWithString{"null", "1", "null", "2", "null", "3"}; + const auto * col_2 = typeid_cast(block.getColumns()[2].get()); + const auto * col_2_nest = &static_cast(col_2->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res2[i] == "null") + { + ASSERT_EQ(col_2->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_2_nest->getDataAt(i), res2[i]); + } + } + + const auto res3 = ColumnWithUInt64{1, 1, 1, 1, 0, 0}; + const auto * col_3 = typeid_cast(block.getColumns()[3].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_3->getElement(i), res3[i]); + } + + const auto res4 = ColumnWithUInt64{1, 2, 1, 2, 1, 2}; + const auto * col_4 = typeid_cast(block.getColumns()[4].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_4->getElement(i), res4[i]); + } + } +} +CATCH + +TEST_F(BlockExpand, ExpandLogic) +try +{ + { + // test basic block expand operation. (two grouping set) + const ColumnsWithTypeAndName + ori_col + = { + toVec(col_name[0], ColumnWithInt64{1, 0, -1}), + toVec(col_name[1], ColumnWithString{"1 ", "1 ", "1 "}), + toVec(col_name[2], ColumnWithString{"1", "2", "3"}), + toVec(col_name[3], ColumnWithUInt64{1, 1, 0}), + }; + // group set, group set + GroupingSet g_gender = GroupingSet{GroupingColumnNames{col_name[1]}}; + GroupingSet g_country = GroupingSet{GroupingColumnNames{col_name[2]}}; + GroupingSets group_sets = GroupingSets{g_gender, g_country}; + Expand expand = Expand(group_sets); + Block block(ori_col); + auto origin_rows = block.rows(); + + expand.replicateAndFillNull(block); + // assert the col size is added with 1. + ASSERT_EQ(block.getColumns().size(), size_t(5)); + // assert the new col groupingID is appended. + ASSERT_EQ(block.getColumnsWithTypeAndName()[4].name, "groupingID"); + // assert the block size is equal to origin rows * grouping set num. + auto expand_rows = block.rows(); + auto grouping_set_num = 2; + ASSERT_EQ(origin_rows * grouping_set_num, expand_rows); // 6 + // assert grouping set column are nullable. + ASSERT_EQ(block.getColumns()[0].get()->isColumnNullable(), false); + ASSERT_EQ(block.getColumns()[1].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[2].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[3].get()->isColumnNullable(), false); + ASSERT_EQ(block.getColumns()[4].get()->isColumnNullable(), false); + + // assert the rows layout + // "age", "gender", "country", "region", "groupingID" + // ori_col 1 "1 " null 1 1 + // rpt_col 1 null "1" 1 2 + // + // ori_col 0 "1 " null 1 1 + // rpt_col 0 null "2" 1 2 + // + // ori_col -1 "1 " null 0 1 + // rpt_col -1 null "3" 0 2 + + const auto res0 = ColumnWithInt64{1, 1, 0, 0, -1, -1}; + const auto * col_0 = typeid_cast(block.getColumns()[0].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_0->getElement(i), res0[i]); + } + + const auto res1 = ColumnWithString{"1 ", "null", "1 ", "null", "1 ", "null"}; + const auto * col_1 = typeid_cast(block.getColumns()[1].get()); + const auto * col_1_nest = &static_cast(col_1->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res1[i] == "null") + { + ASSERT_EQ(col_1->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_1_nest->getDataAt(i), res1[i]); + } + } + + const auto res2 = ColumnWithString{"null", "1", "null", "2", "null", "3"}; + const auto * col_2 = typeid_cast(block.getColumns()[2].get()); + const auto * col_2_nest = &static_cast(col_2->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res2[i] == "null") + { + ASSERT_EQ(col_2->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_2_nest->getDataAt(i), res2[i]); + } + } + + const auto res3 = ColumnWithUInt64{1, 1, 1, 1, 0, 0}; + const auto * col_3 = typeid_cast(block.getColumns()[3].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_3->getElement(i), res3[i]); + } + + const auto res4 = ColumnWithUInt64{1, 2, 1, 2, 1, 2}; + const auto * col_4 = typeid_cast(block.getColumns()[4].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_4->getElement(i), res4[i]); + } + } + { + // test block expand operation for multi grouping set (triple here) + const ColumnsWithTypeAndName + ori_col + = { + toVec(col_name[0], ColumnWithInt64{1, 0, -1}), + toVec(col_name[1], ColumnWithString{"aaa", "bbb", "ccc"}), + toVec(col_name[2], ColumnWithString{"1", "2", "3"}), + toVec(col_name[3], ColumnWithUInt64{1, 1, 0}), + }; + // group set, group set + GroupingSet g_gender = GroupingSet{GroupingColumnNames{col_name[1]}}; + GroupingSet g_country = GroupingSet{GroupingColumnNames{col_name[2]}}; + GroupingSet g_region = GroupingSet{GroupingColumnNames{col_name[3]}}; + GroupingSets group_sets = GroupingSets{g_gender, g_country, g_region}; + Expand expand = Expand(group_sets); + Block block(ori_col); + auto origin_rows = block.rows(); + + expand.replicateAndFillNull(block); + // assert the col size is added with 1. + ASSERT_EQ(block.getColumns().size(), size_t(5)); + // assert the new col groupingID is appended. + ASSERT_EQ(block.getColumnsWithTypeAndName()[4].name, "groupingID"); + // assert the block size is equal to origin rows * grouping set num. + auto expand_rows = block.rows(); + auto grouping_set_num = 3; + ASSERT_EQ(origin_rows * grouping_set_num, expand_rows); // 9 + // assert grouping set column are nullable. + ASSERT_EQ(block.getColumns()[0].get()->isColumnNullable(), false); + ASSERT_EQ(block.getColumns()[1].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[2].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[3].get()->isColumnNullable(), true); + ASSERT_EQ(block.getColumns()[4].get()->isColumnNullable(), false); + + // assert the rows layout + // "age", "gender", "country", "region", "groupingID" + // ori_col 1 "aaa" null null 1 + // rpt_col 1 null "1" null 2 + // rpt_col 1 null null 1 3 + // + // ori_col 0 "bbb" null null 1 + // rpt_col 0 null "2" null 2 + // rpt_col 0 null null 1 3 + + // ori_col -1 "ccc" null null 1 + // rpt_col -1 null "3" null 2 + // rpt_col -1 null null 0 3 + + const auto res0 = ColumnWithInt64{1, 1, 1, 0, 0, 0, -1, -1, -1}; + const auto * col_0 = typeid_cast(block.getColumns()[0].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_0->getElement(i), res0[i]); + } + + const auto res1 = ColumnWithString{"aaa", "null", "null", "bbb", "null", "null", "ccc", "null", "null"}; + const auto * col_1 = typeid_cast(block.getColumns()[1].get()); + const auto * col_1_nest = &static_cast(col_1->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res1[i] == "null") + { + ASSERT_EQ(col_1->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_1_nest->getDataAt(i), res1[i]); + } + } + + const auto res2 = ColumnWithString{"null", "1", "null", "null", "2", "null", "null", "3", "null"}; + const auto * col_2 = typeid_cast(block.getColumns()[2].get()); + const auto * col_2_nest = &static_cast(col_2->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res2[i] == "null") + { + ASSERT_EQ(col_2->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_2_nest->getDataAt(i), res2[i]); + } + } + + // use UInt64(-1) to represent null. + const auto res3 = ColumnWithUInt64{static_cast(-1), static_cast(-1), 1, static_cast(-1), static_cast(-1), 1, static_cast(-1), static_cast(-1), 0}; + const auto * col_3 = typeid_cast(block.getColumns()[3].get()); + const auto * col_3_nest = &typeid_cast(col_3->getNestedColumn()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + if (res3[i] == static_cast(-1)) + { + ASSERT_EQ(col_3->isNullAt(i), true); + } + else + { + ASSERT_EQ(col_3_nest->getElement(i), res3[i]); + } + } + + const auto res4 = ColumnWithUInt64{1, 2, 3, 1, 2, 3, 1, 2, 3}; + const auto * col_4 = typeid_cast(block.getColumns()[4].get()); + for (int i = 0; i < static_cast(expand_rows); ++i) + { + ASSERT_EQ(col_4->getElement(i), res4[i]); + } + } + { + /// test a empty block + const ColumnsWithTypeAndName + ori_col + = { + toVec(col_name[0], ColumnWithInt64{}), // without data. + toVec(col_name[1], ColumnWithString{}), + toVec(col_name[2], ColumnWithString{}), + toVec(col_name[3], ColumnWithUInt64{}), + }; + // group set, group set + GroupingSet g_gender = GroupingSet{GroupingColumnNames{col_name[1]}}; + GroupingSet g_country = GroupingSet{GroupingColumnNames{col_name[2]}}; + GroupingSet g_region = GroupingSet{GroupingColumnNames{col_name[3]}}; + GroupingSets group_sets = GroupingSets{g_gender, g_country, g_region}; + Expand expand = Expand(group_sets); + Block block(ori_col); + auto origin_rows = block.rows(); + + expand.replicateAndFillNull(block); + // assert the col size is added with 1. + ASSERT_EQ(block.getColumns().size(), size_t(5)); + // assert the new col groupingID is appended. + ASSERT_EQ(block.getColumnsWithTypeAndName()[4].name, "groupingID"); + // assert the block size is equal to origin rows * grouping set num. + auto expand_rows = block.rows(); + auto grouping_set_num = 3; + ASSERT_EQ(origin_rows, 0); + ASSERT_EQ(origin_rows * grouping_set_num, expand_rows); // 0 + // assert grouping set column are nullable. + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaTree.h b/dbms/src/Storages/DeltaMerge/DeltaTree.h index ad3fd32b3b1..b51575ba732 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaTree.h +++ b/dbms/src/Storages/DeltaMerge/DeltaTree.h @@ -892,8 +892,14 @@ class DeltaTree } public: - DeltaTree() { init(std::make_shared()); } - explicit DeltaTree(const ValueSpacePtr & insert_value_space_) { init(insert_value_space_); } + DeltaTree() + { + init(std::make_shared()); + } + explicit DeltaTree(const ValueSpacePtr & insert_value_space_) + { + init(insert_value_space_); + } DeltaTree(const Self & o); DeltaTree & operator=(const Self & o) @@ -954,10 +960,19 @@ class DeltaTree check(root, true); } - size_t getBytes() { return bytes; } + size_t getBytes() + { + return bytes; + } - size_t getHeight() const { return height; } - EntryIterator begin() const { return EntryIterator(left_leaf, 0, 0); } + size_t getHeight() const + { + return height; + } + EntryIterator begin() const + { + return EntryIterator(left_leaf, 0, 0); + } EntryIterator end() const { Int64 delta = isLeaf(root) ? as(Leaf, root)->getDelta() : as(Intern, root)->getDelta(); @@ -971,11 +986,23 @@ class DeltaTree return std::make_shared>(left_leaf, num_entries, delta); } - CompactedEntriesPtr getCompactedEntries() { return std::make_shared(begin(), end(), num_entries); } + CompactedEntriesPtr getCompactedEntries() + { + return std::make_shared(begin(), end(), num_entries); + } - size_t numEntries() const { return num_entries; } - size_t numInserts() const { return num_inserts; } - size_t numDeletes() const { return num_deletes; } + size_t numEntries() const + { + return num_entries; + } + size_t numInserts() const + { + return num_inserts; + } + size_t numDeletes() const + { + return num_deletes; + } void addDelete(UInt64 rid); void addInsert(UInt64 rid, UInt64 tuple_id); diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 2810bb033dc..d0b72d10948 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/TestUtils/executorSerializer.cpp b/dbms/src/TestUtils/executorSerializer.cpp index de0880c2e56..607bd27c68f 100644 --- a/dbms/src/TestUtils/executorSerializer.cpp +++ b/dbms/src/TestUtils/executorSerializer.cpp @@ -160,6 +160,31 @@ void serializeTopN(const String & executor_id, const tipb::TopN & top_n, FmtBuff buf.fmtAppend("}}, limit: {}\n", top_n.limit()); } +void serializeExpandSource(const String & executor_id, const tipb::Expand & expand, FmtBuffer & buf) +{ + buf.fmtAppend("{} | expanded_by: [", executor_id); + for (const auto & grouping_set : expand.grouping_sets()) + { + buf.append("<"); + for (const auto & grouping_exprs : grouping_set.grouping_exprs()) + { + buf.append("{"); + for (auto i = 0; i < grouping_exprs.grouping_expr().size(); ++i) + { + if (i != 0) + { + buf.append(","); + } + auto expr = grouping_exprs.grouping_expr().Get(i); + serializeExpression(expr, buf); + } + buf.append("}"); + } + buf.append(">"); + } + buf.append("]\n"); +} + void serializeJoin(const String & executor_id, const tipb::Join & join, FmtBuffer & buf) { buf.fmtAppend("{} | {}, {}. left_join_keys: {{", executor_id, getJoinTypeName(join.join_type()), getJoinExecTypeName(join.join_exec_type())); @@ -282,6 +307,9 @@ void ExecutorSerializer::serializeListStruct(const tipb::DAGRequest * dag_reques case tipb::ExecType::TypeLimit: serializeLimit("Limit", executor.limit(), buf); break; + case tipb::ExecType::TypeExpand: + serializeExpandSource("Expand", executor.expand(), buf); + break; default: throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal); } @@ -339,6 +367,9 @@ void ExecutorSerializer::serializeTreeStruct(const tipb::Executor & root_executo case tipb::ExecType::TypeWindow: serializeWindow(executor.executor_id(), executor.window(), buf); break; + case tipb::ExecType::TypeExpand: + serializeExpandSource(executor.executor_id(), executor.expand(), buf); + break; default: throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal); } diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 2a7c820ce55..d1be7e1c17f 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -361,6 +362,31 @@ DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, boo return *this; } +DAGRequestBuilder & DAGRequestBuilder::expand(MockVVecColumnNameVec grouping_set_columns) +{ + assert(root); + auto grouping_sets_ast = mock::MockVVecGroupingNameVec(); + auto grouping_col_collection = std::set(); + for (const auto & grouping_set : grouping_set_columns) + { + auto grouping_set_ast = mock::MockVecGroupingNameVec(); + for (const auto & grouping_exprs : grouping_set) + { + auto grouping_exprs_ast = mock::MockGroupingNameVec(); + for (const auto & grouping_col : grouping_exprs) + { + auto ast_col_ptr = buildColumn(grouping_col); // string identifier change to ast column ref + grouping_exprs_ast.emplace_back(std::move(ast_col_ptr)); + grouping_col_collection.insert(grouping_col); + } + grouping_set_ast.emplace_back(std::move(grouping_exprs_ast)); + } + grouping_sets_ast.emplace_back(std::move(grouping_set_ast)); + } + root = compileExpand(root, getExecutorIndex(), grouping_sets_ast, grouping_col_collection); + return *this; +} + void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & mock_column_infos, size_t concurrency_hint) { auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan")); diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 307a034a9ac..248d303abb2 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -38,6 +38,8 @@ using MockOrderByItemVec = std::vector; using MockPartitionByItem = std::pair; using MockPartitionByItemVec = std::vector; using MockColumnNameVec = std::vector; +using MockVecColumnNameVec = std::vector; // for grouping set (every groupingExpr element inside is slice of column) +using MockVVecColumnNameVec = std::vector; // for grouping sets using MockAstVec = std::vector; using MockWindowFrame = mock::MockWindowFrame; @@ -145,6 +147,9 @@ class DAGRequestBuilder DAGRequestBuilder & sort(MockOrderByItem order_by, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); DAGRequestBuilder & sort(MockOrderByItemVec order_by_vec, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); + // expand + DAGRequestBuilder & expand(MockVVecColumnNameVec grouping_set_columns); + void setCollation(Int32 collator_) { properties.collator = convertToTiDBCollation(collator_); } Int32 getCollation() const { return abs(properties.collator); } diff --git a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp index f1826226aeb..a000bf3f87b 100644 --- a/dbms/src/TestUtils/tests/gtest_mock_executors.cpp +++ b/dbms/src/TestUtils/tests/gtest_mock_executors.cpp @@ -319,6 +319,40 @@ try } CATCH +TEST_F(MockDAGRequestTest, Expand) +try +{ + auto request = context.scan("test_db", "test_table").expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"s1"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .build(context); + { + String expected = "expand_1 | expanded_by: [<{<0, String>}><{<1, String>}>]\n" + " table_scan_0 | {<0, String>, <1, String>}"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + } + request = context.receive("sender_1").expand(MockVVecColumnNameVec{ + MockVecColumnNameVec{ + MockColumnNameVec{"s1"}, + }, + MockVecColumnNameVec{ + MockColumnNameVec{"s2"}, + }, + }) + .build(context); + { + String expected = "expand_1 | expanded_by: [<{<0, String>}><{<1, String>}>]\n" + " exchange_receiver_0 | type:PassThrough, {<0, String>, <1, String>, <2, String>}"; + ASSERT_DAGREQUEST_EQAUL(expected, request); + } +} +CATCH + TEST_F(MockDAGRequestTest, MockWindow) try {