From 44fa596ee70e382155c69558126b823eb3090c03 Mon Sep 17 00:00:00 2001 From: hezheyu Date: Tue, 5 Jul 2022 22:34:42 +0800 Subject: [PATCH] Pushdown bitwise aggregation functions to tiflash. Signed-off-by: hezheyu --- .../Coprocessor/DAGExpressionAnalyzer.cpp | 39 ++++++++++++++++++- .../Flash/Coprocessor/DAGExpressionAnalyzer.h | 7 ++++ dbms/src/Flash/Coprocessor/DAGUtils.cpp | 2 +- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index aa269469cdb..cf9dd467639 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -158,6 +158,34 @@ void DAGExpressionAnalyzer::fillArgumentDetail( arg_collators.push_back(removeNullable(arg_types.back())->isString() ? getCollatorFromExpr(arg) : nullptr); } +void DAGExpressionAnalyzer::fillArgumentDetailForAggFuncBitwise( + const ExpressionActionsPtr & actions, + const tipb::Expr & arg, + Names & arg_names, + DataTypes & arg_types, + TiDB::TiDBCollators & arg_collators) +{ + DataTypePtr uint64_type = std::make_shared(); + const Block & sample_block = actions->getSampleBlock(); + String name = getActions(arg, actions); + DataTypePtr orig_type = sample_block.getByName(name).type; + if (!removeNullable(orig_type)->equals(*uint64_type)) + { + if (orig_type->isNullable()) + { + name = appendCast(makeNullable(uint64_type), actions, name); + } + else + { + name = appendCast(uint64_type, actions, name); + } + } + + arg_names.push_back(name); + arg_types.push_back(orig_type->isNullable() ? makeNullable(uint64_type) : uint64_type); + arg_collators.push_back(removeNullable(arg_types.back())->isString() ? getCollatorFromExpr(arg) : nullptr); +} + void DAGExpressionAnalyzer::buildGroupConcat( const tipb::Expr & expr, const ExpressionActionsPtr & actions, @@ -277,13 +305,22 @@ void DAGExpressionAnalyzer::buildCommonAggFunc( NamesAndTypes & aggregated_columns, bool empty_input_as_null) { + tipb::ExprType tp = expr.tp(); + bool is_bitwise = tp == tipb::ExprType::Agg_BitAnd || tp == tipb::ExprType::Agg_BitOr || tp == tipb::ExprType::Agg_BitXor; auto child_size = expr.children_size(); Names arg_names; DataTypes arg_types; TiDB::TiDBCollators arg_collators; for (Int32 i = 0; i < child_size; ++i) { - fillArgumentDetail(actions, expr.children(i), arg_names, arg_types, arg_collators); + if (is_bitwise) + { + fillArgumentDetailForAggFuncBitwise(actions, expr.children(i), arg_names, arg_types, arg_collators); + } + else + { + fillArgumentDetail(actions, expr.children(i), arg_names, arg_types, arg_collators); + } } appendAggDescription(arg_names, arg_types, arg_collators, agg_func_name, aggregate_descriptions, aggregated_columns, empty_input_as_null); diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 046088ab2b2..d09a1b0140f 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -210,6 +210,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable DataTypes & arg_types, TiDB::TiDBCollators & arg_collators); + void fillArgumentDetailForAggFuncBitwise( + const ExpressionActionsPtr & actions, + const tipb::Expr & arg, + Names & arg_names, + DataTypes & arg_types, + TiDB::TiDBCollators & arg_collators); + void makeExplicitSet( const tipb::Expr & expr, const Block & sample_block, diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 9ffa29cd14d..ffd2bce71b5 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -51,7 +51,7 @@ const std::unordered_map agg_func_map({ {tipb::ExprType::GroupConcat, "groupArray"}, //{tipb::ExprType::Avg, ""}, //{tipb::ExprType::Agg_BitAnd, ""}, - //{tipb::ExprType::Agg_BitOr, ""}, + {tipb::ExprType::Agg_BitOr, "groupBitOr"}, //{tipb::ExprType::Agg_BitXor, ""}, //{tipb::ExprType::Std, ""}, //{tipb::ExprType::Stddev, ""},