Skip to content

Commit

Permalink
plan, executor: implement Expand operator for grouping sets (#6545)
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Feb 22, 2023
1 parent e944f66 commit b2a445d
Show file tree
Hide file tree
Showing 38 changed files with 2,006 additions and 47 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ namespace DB
F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \
F(type_exchange_receiver, {"type", "exchange_receiver"}), F(type_projection, {"type", "projection"}), \
F(type_partition_ts, {"type", "partition_table_scan"}), \
F(type_window, {"type", "window"}), F(type_window_sort, {"type", "window_sort"})) \
F(type_window, {"type", "window"}), F(type_window_sort, {"type", "window_sort"}), \
F(type_expand, {"type", "expand"})) \
M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \
F(type_cop, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20}), \
Expand Down Expand Up @@ -276,7 +277,8 @@ namespace DB
M(tiflash_compute_request_unit, "Request Unit used by tiflash compute", Counter, \
F(type_mpp, {{"type", "mpp"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, {{"type", "cop"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()})) \
F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}))

// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/ExpressionBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,4 @@ Block ExpressionBlockInputStream::readImpl()
expression->execute(res);
return res;
}

} // namespace DB
68 changes: 68 additions & 0 deletions dbms/src/Debug/MockExecutor/ExpandBinder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Debug/MockExecutor/AstToPBUtils.h>
#include <Debug/MockExecutor/ExpandBinder.h>

namespace DB::mock
{

bool ExpandBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeExpand);
tipb_executor->set_executor_id(name);
tipb::Expand * expand = tipb_executor->mutable_expand();
for (const auto & grouping_set : grouping_sets_columns)
{
auto * gss = expand->add_grouping_sets();
for (const auto & grouping_exprs : grouping_set)
{
auto * ges = gss->add_grouping_exprs();
for (const auto & grouping_col : grouping_exprs)
{
tipb::Expr * add_column = ges->add_grouping_expr();
astToPB(children[0]->output_schema, grouping_col, add_column, collator_id, context); // ast column ref change to tipb:Expr column ref
}
}
}
auto * children_executor = expand->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

ExecutorBinderPtr compileExpand(ExecutorBinderPtr input, size_t & executor_index, MockVVecGroupingNameVec grouping_set_columns, std::set<String> in_set)
{
DAGSchema output_schema;
for (const auto & field : input->output_schema)
{
// if the column is in the grouping sets, make it nullable.
if (in_set.find(field.first) != in_set.end() && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeLongLong);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); // should have NOT NULL FLAG
field_type.set_flen(-1);
field_type.set_decimal(-1);
output_schema.push_back(std::make_pair("groupingID", TiDB::fieldTypeToColumnInfo(field_type)));
}
ExecutorBinderPtr expand = std::make_shared<ExpandBinder>(executor_index, output_schema, std::move(grouping_set_columns));
expand->children.push_back(input);
return expand;
}
} // namespace DB::mock
43 changes: 43 additions & 0 deletions dbms/src/Debug/MockExecutor/ExpandBinder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Debug/MockExecutor/ExecutorBinder.h>

namespace DB::mock
{
using MockGroupingNameVec = std::vector<ASTPtr>;
using MockVecGroupingNameVec = std::vector<MockGroupingNameVec>;
using MockVVecGroupingNameVec = std::vector<MockVecGroupingNameVec>;

class ExpandBinder : public ExecutorBinder
{
public:
ExpandBinder(size_t & index_, const DAGSchema & output_schema_, MockVVecGroupingNameVec gss)
: ExecutorBinder(index_, "expand_" + std::to_string(index_), output_schema_)
, grouping_sets_columns(gss)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;

void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }

private:
// for now, every grouping set is base columns list, modify structure to be one more nested if grouping set merge is enabled.
MockVVecGroupingNameVec grouping_sets_columns;
};

ExecutorBinderPtr compileExpand(ExecutorBinderPtr input, size_t & executor_index, MockVVecGroupingNameVec grouping_set_columns, std::set<String> set);
} // namespace DB::mock
60 changes: 60 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsTiDBConversion.h>
#include <Interpreters/Context.h>
#include <Interpreters/Expand.h>
#include <Interpreters/Set.h>
#include <Interpreters/Settings.h>
#include <Interpreters/convertFieldToType.h>
Expand Down Expand Up @@ -804,6 +805,65 @@ NamesAndTypes DAGExpressionAnalyzer::buildOrderColumns(
return order_columns;
}

GroupingSets DAGExpressionAnalyzer::buildExpandGroupingColumns(
const tipb::Expand & expand,
const ExpressionActionsPtr & actions)
{
GroupingSets group_sets_columns;
std::map<String, bool> map_grouping_col;
group_sets_columns.reserve(expand.grouping_sets().size());
for (const auto & group_set : expand.grouping_sets())
{
GroupingSet group_set_columns;
group_set_columns.reserve(group_set.grouping_exprs().size());
for (const auto & group_exprs : group_set.grouping_exprs())
{
GroupingColumnNames group_exprs_columns;
group_exprs_columns.reserve(group_exprs.grouping_expr().size());
for (const auto & group_expr : group_exprs.grouping_expr())
{
String cp_name = getActions(group_expr, actions);
// tidb expression computation is based on column index offset child's chunk schema, change to ck block column name here.
group_exprs_columns.emplace_back(cp_name);
map_grouping_col.insert(std::pair<String, bool>(cp_name, true));
}
// move here, cause basic string is copied from input cols.
group_set_columns.emplace_back(std::move(group_exprs_columns));
}
group_sets_columns.emplace_back(std::move(group_set_columns));
}
// change the original source column to be nullable, and add a new column for groupingID.
for (auto & mutable_one : source_columns)
{
if (map_grouping_col[mutable_one.name])
mutable_one.type = makeNullable(mutable_one.type);
}
source_columns.emplace_back(Expand::grouping_identifier_column_name, Expand::grouping_identifier_column_type);
return group_sets_columns;
}

ExpressionActionsPtr DAGExpressionAnalyzer::appendExpand(
const tipb::Expand & expand,
ExpressionActionsChain & chain)
{
auto & last_step = initAndGetLastStep(chain);
for (const auto & origin_col : last_step.actions->getSampleBlock().getNamesAndTypesList())
{
last_step.required_output.push_back(origin_col.name);
}
auto grouping_sets = buildExpandGroupingColumns(expand, last_step.actions);
last_step.actions->add(ExpressionAction::expandSource(grouping_sets));

auto before_expand = chain.getLastActions();
chain.finalize();
chain.clear();

auto & after_expand_step = initAndGetLastStep(chain);
for (const auto & column : getCurrentInputColumns())
after_expand_step.required_output.push_back(column.name);
return before_expand;
}

std::vector<NameAndTypePair> DAGExpressionAnalyzer::appendOrderBy(
ExpressionActionsChain & chain,
const tipb::TopN & topN)
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class DAGExpressionAnalyzer : private boost::noncopyable
ExpressionActionsChain & chain,
const std::vector<const tipb::Expr *> & conditions);

GroupingSets buildExpandGroupingColumns(const tipb::Expand & expand, const ExpressionActionsPtr & actions);

ExpressionActionsPtr appendExpand(const tipb::Expand & expand, ExpressionActionsChain & chain);

NamesAndTypes buildWindowOrderColumns(const tipb::Sort & window_sort) const;

std::vector<NameAndTypePair> appendOrderBy(
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ bool isSourceNode(const tipb::Executor * root)
const static String SOURCE_NAME("source");
const static String SEL_NAME("selection");
const static String AGG_NAME("aggregation");
const static String EXPAND_NAME("expand");
const static String WINDOW_NAME("window");
const static String WINDOW_SORT_NAME("window_sort");
const static String HAVING_NAME("having");
Expand Down Expand Up @@ -96,6 +97,12 @@ DAGQueryBlock::DAGQueryBlock(const tipb::Executor & root_, QueryBlockIDGenerator
}
current = &current->selection().child();
break;
case tipb::ExecType::TypeExpand:
GET_METRIC(tiflash_coprocessor_executor_count, type_expand).Increment();
assignOrThrowException(&expand, current, EXPAND_NAME);
expand_name = current->executor_id();
current = &current->expand().child();
break;
case tipb::ExecType::TypeStreamAgg:
RUNTIME_CHECK_MSG(current->aggregation().group_by_size() == 0, STREAM_AGG_ERROR);
case tipb::ExecType::TypeAggregation:
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class DAGQueryBlock
String having_name;
const tipb::Executor * limit_or_topn = nullptr;
String limit_or_topn_name;
const tipb::Executor * expand = nullptr;
String expand_name;
const tipb::Executor * exchange_sender = nullptr;
String exchange_sender_name;
UInt32 id;
Expand Down
53 changes: 45 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/ExchangeSenderBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
#include <DataStreams/HashJoinProbeBlockInputStream.h>
Expand All @@ -43,6 +44,7 @@
#include <Flash/Coprocessor/StorageDisaggregatedInterpreter.h>
#include <Flash/Mpp/newMPPExchangeWriter.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Expand.h>
#include <Interpreters/Join.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/Transaction/TMTContext.h>
Expand Down Expand Up @@ -73,7 +75,9 @@ struct AnalysisResult
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr before_order;
ExpressionActionsPtr before_expand;
ExpressionActionsPtr before_select;
ExpressionActionsPtr final_projection;

String filter_column_name;
Expand Down Expand Up @@ -131,6 +135,14 @@ AnalysisResult analyzeExpressions(
if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::ExecType::TypeTopN)
{
res.order_columns = analyzer.appendOrderBy(chain, query_block.limit_or_topn->topn());
res.before_order = chain.getLastActions();
chain.addStep();
}

if (query_block.expand)
{
res.before_expand = analyzer.appendExpand(query_block.expand->expand(), chain);
chain.addStep();
}

const auto & dag_context = *context.getDAGContext();
Expand All @@ -146,7 +158,7 @@ AnalysisResult analyzeExpressions(
chain,
query_block.qb_column_prefix);

res.before_order_and_select = chain.getLastActions();
res.before_select = chain.getLastActions();

chain.finalize();
chain.clear();
Expand Down Expand Up @@ -570,12 +582,12 @@ void DAGQueryBlockInterpreter::handleWindowOrder(DAGPipeline & pipeline, const t
}

// To execute a query block, you have to:
// 1. generate the date stream and push it to pipeline.
// 1. generate the data stream and push it to pipeline.
// 2. assign the analyzer
// 3. construct a final projection, even if it's not necessary. just construct it.
// Talking about projection, it has the following rules.
// 1. if the query block does not contain agg, then the final project is the same as the source Executor
// 2. if the query block contains agg, then the final project is the same as agg Executor
// 2. if the query block contains agg/expand, then the final project is the same as agg/expand Executor
// 3. if the cop task may contains more then 1 query block, and the current query block is not the root
// query block, then the project should add an alias for each column that needs to be projected, something
// like final_project.emplace_back(col.name, query_block.qb_column_prefix + col.name);
Expand Down Expand Up @@ -664,9 +676,9 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
executeWhere(pipeline, res.before_having, res.having_column_name, "execute having");
recordProfileStreams(pipeline, query_block.having_name);
}
if (res.before_order_and_select)
if (res.before_order)
{
executeExpression(pipeline, res.before_order_and_select, log, "before order and select");
executeExpression(pipeline, res.before_order, log, "before order");
}

if (!res.order_columns.empty())
Expand All @@ -676,14 +688,30 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
recordProfileStreams(pipeline, query_block.limit_or_topn_name);
}

// execute final project action
executeProject(pipeline, final_project, "final projection");
// execute limit
if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::TypeLimit)
{
executeLimit(pipeline);
recordProfileStreams(pipeline, query_block.limit_or_topn_name);
}

// execute the expand OP after all filter/limits and so on.
// since expand OP has some row replication work to do, place it after limit can reduce some unnecessary burden.
// and put it before the final projection, because we should recognize some base col as grouping set col before change their alias.
if (res.before_expand)
{
executeExpand(pipeline, res.before_expand);
recordProfileStreams(pipeline, query_block.expand_name);
}

if (res.before_select)
{
executeExpression(pipeline, res.before_select, log, "before select");
}

// execute final project action
executeProject(pipeline, final_project, "final projection");

restorePipelineConcurrency(pipeline);

// execute exchange_sender
Expand Down Expand Up @@ -724,6 +752,15 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline)
}
}

void DAGQueryBlockInterpreter::executeExpand(DAGPipeline & pipeline, const ExpressionActionsPtr & expr)
{
String expand_extra_info = fmt::format("expand: grouping set {}", expr->getActions().back().expand->getGroupingSetsDes());
pipeline.transform([&](auto & stream) {
stream = std::make_shared<ExpressionBlockInputStream>(stream, expr, log->identifier());
stream->setExtraInfo(expand_extra_info);
});
}

void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
{
RUNTIME_ASSERT(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr, log, "exchange_sender only run in MPP");
Expand Down
Loading

0 comments on commit b2a445d

Please sign in to comment.