Skip to content

Commit

Permalink
expression: support different expr push down for TiKV and TiFlash (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Mar 11, 2020
1 parent 9f0736e commit a999ef6
Show file tree
Hide file tree
Showing 11 changed files with 415 additions and 321 deletions.
22 changes: 11 additions & 11 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -1288,14 +1288,14 @@ cntrycode
order by
cntrycode;
id estRows task operator info
Sort_32 1.00 root Column#27:asc
└─Projection_34 1.00 root Column#27, Column#28, Column#29
└─HashAgg_37 1.00 root group by:Column#27, funcs:count(1)->Column#28, funcs:sum(tpch.customer.c_acctbal)->Column#29, funcs:firstrow(Column#27)->Column#27
└─Projection_38 0.00 root substring(tpch.customer.c_phone, 1, 2)->Column#27, tpch.customer.c_acctbal
└─HashLeftJoin_39 0.00 root anti semi join, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
├─TableReader_45(Build) 75000000.00 root data:TableFullScan_44
│ └─TableFullScan_44 75000000.00 cop[tikv] table:orders, keep order:false
└─Selection_40(Probe) 0.00 root in(substring(tpch.customer.c_phone, 1, 2), "20", "40", "22", "30", "39", "42", "21")
└─TableReader_43 0.00 root data:Selection_42
└─Selection_42 0.00 cop[tikv] gt(tpch.customer.c_acctbal, NULL)
└─TableFullScan_41 7500000.00 cop[tikv] table:customer, keep order:false
Sort_39 1.00 root Column#27:asc
└─Projection_41 1.00 root Column#27, Column#28, Column#29
└─HashAgg_44 1.00 root group by:Column#27, funcs:count(1)->Column#28, funcs:sum(tpch.customer.c_acctbal)->Column#29, funcs:firstrow(Column#27)->Column#27
└─Projection_45 0.00 root substring(tpch.customer.c_phone, 1, 2)->Column#27, tpch.customer.c_acctbal
└─HashLeftJoin_46 0.00 root anti semi join, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
├─TableReader_52(Build) 75000000.00 root data:TableFullScan_51
│ └─TableFullScan_51 75000000.00 cop[tikv] table:orders, keep order:false
└─Selection_50(Probe) 0.00 root in(substring(tpch.customer.c_phone, 1, 2), "20", "40", "22", "30", "39", "42", "21")
└─TableReader_49 0.00 root data:Selection_48
└─Selection_48 0.00 cop[tikv] gt(tpch.customer.c_acctbal, NULL)
└─TableFullScan_47 7500000.00 cop[tikv] table:customer, keep order:false
2 changes: 1 addition & 1 deletion executor/reload_expr_pushdown_blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func LoadExprPushdownBlacklist(ctx sessionctx.Context) (err error) {
}
newBlacklist[name] = struct{}{}
}
expression.DefaultExprPushdownBlacklist.Store(newBlacklist)
expression.DefaultExprPushDownBlacklist.Store(newBlacklist)
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions expression/aggregation/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -187,6 +188,16 @@ func IsAllFirstRow(aggFuncs []*AggFuncDesc) bool {
return true
}

// CheckAggPushDown checks whether an agg function can be pushed to storage.
func CheckAggPushDown(aggFunc *AggFuncDesc, storeType kv.StoreType) bool {
switch storeType {
case kv.TiFlash:
return CheckAggPushFlash(aggFunc)
default:
return true
}
}

// CheckAggPushFlash checks whether an agg function can be pushed to flash storage.
func CheckAggPushFlash(aggFunc *AggFuncDesc) bool {
switch aggFunc.Name {
Expand Down
229 changes: 1 addition & 228 deletions expression/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@
package expression

import (
"strings"
"sync/atomic"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand All @@ -34,42 +29,6 @@ import (
"go.uber.org/zap"
)

// ExpressionsToPB converts expression to tipb.Expr.
func ExpressionsToPB(sc *stmtctx.StatementContext, exprs []Expression, client kv.Client) (pbCNF *tipb.Expr, pushed []Expression, remained []Expression) {
pc := PbConverter{client: client, sc: sc}
retTypeOfAnd := &types.FieldType{
Tp: mysql.TypeLonglong,
Flen: 1,
Decimal: 0,
Flag: mysql.BinaryFlag,
Charset: charset.CharsetBin,
Collate: charset.CollationBin,
}

for _, expr := range exprs {
pbExpr := pc.ExprToPB(expr)
if pbExpr == nil {
remained = append(remained, expr)
continue
}

pushed = append(pushed, expr)
if pbCNF == nil {
pbCNF = pbExpr
continue
}

// Merge multiple converted pb expression into a CNF.
pbCNF = &tipb.Expr{
Tp: tipb.ExprType_ScalarFunc,
Sig: tipb.ScalarFuncSig_LogicalAnd,
Children: []*tipb.Expr{pbCNF, pbExpr},
FieldType: ToPBFieldType(retTypeOfAnd),
}
}
return
}

// ExpressionsToPBList converts expressions to tipb.Expr list for new plan.
func ExpressionsToPBList(sc *stmtctx.StatementContext, exprs []Expression, client kv.Client) (pbExpr []*tipb.Expr) {
pc := PbConverter{client: client, sc: sc}
Expand Down Expand Up @@ -275,7 +234,7 @@ func (pc PbConverter) scalarFuncToPBExpr(expr *ScalarFunction) *tipb.Expr {
}

// Check whether this function can be pushed.
if !pc.canFuncBePushed(expr) {
if !canFuncBePushed(expr, kv.UnSpecified) {
return nil
}

Expand Down Expand Up @@ -334,189 +293,3 @@ func SortByItemToPB(sc *stmtctx.StatementContext, client kv.Client, expr Express
}
return &tipb.ByItem{Expr: e, Desc: desc}
}

func (pc PbConverter) canFuncBePushed(sf *ScalarFunction) bool {
// Use the failpoint to control whether to push down an expression in the integration test.
// Push down all expression if the `failpoint expression` is `all`, otherwise, check
// whether scalar function's name is contained in the enabled expression list (e.g.`ne,eq,lt`).
failpoint.Inject("PushDownTestSwitcher", func(val failpoint.Value) bool {
enabled := val.(string)
if enabled == "all" {
return true
}
exprs := strings.Split(enabled, ",")
for _, expr := range exprs {
if strings.ToLower(strings.TrimSpace(expr)) == sf.FuncName.L {
return true
}
}
return false
})

switch sf.FuncName.L {
case
// op functions.
ast.LogicAnd,
ast.LogicOr,
ast.LogicXor,
ast.UnaryNot,
ast.And,
ast.Or,
ast.Xor,
ast.BitNeg,
ast.LeftShift,
ast.RightShift,
ast.UnaryMinus,

// compare functions.
ast.LT,
ast.LE,
ast.EQ,
ast.NE,
ast.GE,
ast.GT,
ast.NullEQ,
ast.In,
ast.IsNull,
ast.Like,
ast.IsTruth,
ast.IsFalsity,

// arithmetical functions.
ast.Plus,
ast.Minus,
ast.Mul,
ast.Div,
ast.Abs,

// math functions.
ast.Ceil,
ast.Ceiling,
ast.Floor,
ast.Sqrt,
ast.Sign,
ast.Ln,
ast.Log,
ast.Log2,
ast.Log10,
ast.Exp,
ast.Pow,
// Rust use the llvm math functions, which have different precision with Golang/MySQL(cmath)
// open the following switchers if we implement them in coprocessor via `cmath`
// ast.Sin,
// ast.Asin,
// ast.Cos,
// ast.Acos,
// ast.Tan,
// ast.Atan,
// ast.Atan2,
// ast.Cot,
ast.Radians,
ast.Degrees,
ast.Conv,
ast.CRC32,

// control flow functions.
ast.Case,
ast.If,
ast.Ifnull,
ast.Coalesce,

// string functions.
ast.Length,
ast.BitLength,
ast.Concat,
ast.ConcatWS,
// ast.Locate,
ast.Replace,
ast.ASCII,
ast.Hex,
ast.Reverse,
ast.LTrim,
ast.RTrim,
// ast.Left,
ast.Strcmp,
ast.Space,
ast.Elt,
ast.Field,

// json functions.
ast.JSONType,
ast.JSONExtract,
// FIXME: JSONUnquote is incompatible with Coprocessor
// ast.JSONUnquote,
ast.JSONObject,
ast.JSONArray,
ast.JSONMerge,
ast.JSONSet,
ast.JSONInsert,
// ast.JSONReplace,
ast.JSONRemove,
ast.JSONLength,

// date functions.
ast.DateFormat,
ast.FromDays,
// ast.ToDays,
ast.DayOfYear,
ast.DayOfMonth,
ast.Year,
ast.Month,
// FIXME: the coprocessor cannot keep the same behavior with TiDB in current compute framework
// ast.Hour,
// ast.Minute,
// ast.Second,
// ast.MicroSecond,
ast.PeriodAdd,
ast.PeriodDiff,
ast.DayName,

// encryption functions.
ast.MD5,
ast.SHA1,
ast.UncompressedLength,

ast.Cast,

// misc functions.
ast.InetNtoa,
ast.InetAton,
ast.Inet6Ntoa,
ast.Inet6Aton,
ast.IsIPv4,
ast.IsIPv4Compat,
ast.IsIPv4Mapped,
ast.IsIPv6:
return isPushdownEnabled(sf.FuncName.L)

// A special case: Only push down Round by signature
case ast.Round:
switch sf.Function.PbCode() {
case
tipb.ScalarFuncSig_RoundReal,
tipb.ScalarFuncSig_RoundInt,
tipb.ScalarFuncSig_RoundDec:
return isPushdownEnabled(sf.FuncName.L)
}
case ast.Rand:
switch sf.Function.PbCode() {
case
tipb.ScalarFuncSig_RandWithSeedFirstGen:
return isPushdownEnabled(sf.FuncName.L)
}
}
return false
}

func isPushdownEnabled(name string) bool {
_, disallowPushdown := DefaultExprPushdownBlacklist.Load().(map[string]struct{})[name]
return !disallowPushdown
}

// DefaultExprPushdownBlacklist indicates the expressions which can not be pushed down to TiKV.
var DefaultExprPushdownBlacklist *atomic.Value

func init() {
DefaultExprPushdownBlacklist = new(atomic.Value)
DefaultExprPushdownBlacklist.Store(make(map[string]struct{}))
}
Loading

0 comments on commit a999ef6

Please sign in to comment.