Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: enforce push mpp down #24849

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution {
if !ctx.GetSessionVars().IsMPPAllowed() {
return false
}
_, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender)
Expand Down
10 changes: 5 additions & 5 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
}
joins := make([]PhysicalPlan, 0, 8)
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash {
if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down Expand Up @@ -1980,7 +1980,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl
if !lt.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if lt.ctx.GetSessionVars().AllowMPPExecution {
if lt.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
Expand Down Expand Up @@ -2370,7 +2370,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash
canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
Expand Down Expand Up @@ -2480,7 +2480,7 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
if !p.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().AllowMPPExecution {
if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
Expand Down Expand Up @@ -2517,7 +2517,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)
if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType {
return nil, true
}
canUseMpp := p.ctx.GetSessionVars().AllowMPPExecution && p.canPushToCop(kv.TiFlash)
canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash)
chReqProps := make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
if canUseMpp && prop.TaskTp == property.MppTaskType {
Expand Down
126 changes: 126 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3124,3 +3124,129 @@ func (s *testIntegrationSuite) TestIssue23846(c *C) {
tk.MustQuery("select count(*) from t where a=0x00A4EEF4FA55D6706ED5").Check(testkit.Rows("1"))
tk.MustQuery("select * from t where a=0x00A4EEF4FA55D6706ED5").Check(testkit.Rows("\x00\xa4\xee\xf4\xfaU\xd6pn\xd5")) // not empty
}

func (s *testIntegrationSuite) TestEnforceMPP(c *C) {
tk := testkit.NewTestKit(c, s.store)

// test value limit of tidb_opt_tiflash_concurrency_factor
err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`)

tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1"))
tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24"))

// test set tidb_allow_mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@session.tidb_allow_mpp = off")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = oN")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = enForcE")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@global.tidb_allow_mpp = faLsE")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@global.tidb_allow_mpp = True")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON"))

err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`)

// test query
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("create index idx on t(a)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// ban mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))

// read from tiflash, batch cop.
tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"StreamAgg_20 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_21 1.00 root data:StreamAgg_9",
" └─StreamAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_19 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_18 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// open mpp
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))

// should use tikv to index read
tk.MustQuery("explain select count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_30 1.00 root funcs:count(Column#6)->Column#3",
"└─IndexReader_31 1.00 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_29 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash, mpp with large cost
tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// enforce mpp
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

// should use mpp
tk.MustQuery("explain select count(*) from t where a=1;").Check(testkit.Rows(
"HashAgg_24 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_26 1.00 root data:ExchangeSender_25",
" └─ExchangeSender_25 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_23 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_22 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash
tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))
}
10 changes: 8 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,10 +2007,16 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
StoreType: kv.TiFlash,
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()
return &rootTask{

cst := t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
if p.ctx.GetSessionVars().IsMPPEnforced() {
cst = 0
}
rt := &rootTask{
p: p,
cst: t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor,
cst: cst,
}
return rt
}

func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool {
Expand Down
21 changes: 16 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,12 @@ type SessionVars struct {
AllowWriteRowID bool

// AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
// Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop.
AllowBatchCop int

// AllowMPPExecution will prefer using mpp way to execute a query.
AllowMPPExecution bool
// AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer.
// Value set to "ENFORCE" means to use mpp whenever possible. Value set to "OFF" means never use mpp.
allowMPPExecution string

// TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed.
AllowAutoRandExplicitInsert bool
Expand Down Expand Up @@ -842,6 +843,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 {
return 1
}

// IsMPPAllowed returns whether mpp execution is allowed.
func (s *SessionVars) IsMPPAllowed() bool {
return s.allowMPPExecution != "OFF"
}

// IsMPPEnforced returns whether mpp execution is enforced.
func (s *SessionVars) IsMPPEnforced() bool {
return s.allowMPPExecution == "ENFORCE"
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
func (s *SessionVars) CheckAndGetTxnScope() string {
if s.InRestrictedSQL {
Expand Down Expand Up @@ -1068,7 +1079,7 @@ func NewSessionVars() *SessionVars {
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.AllowMPPExecution = DefTiDBAllowMPPExecution
vars.allowMPPExecution = DefTiDBAllowMPPExecution

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down Expand Up @@ -1490,7 +1501,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBAllowBatchCop:
s.AllowBatchCop = int(tidbOptInt64(val, DefTiDBAllowBatchCop))
case TiDBAllowMPPExecution:
s.AllowMPPExecution = TiDBOptOn(val)
s.allowMPPExecution = val
case TiDBIndexLookupSize:
s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize)
case TiDBHashJoinConcurrency:
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ var defaultSysVars = []*SysVar{
return oracle.LocalTxnScope
}()},
/* TiDB specific variables */
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdSize, Value: strconv.Itoa(DefBroadcastJoinThresholdSize), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64},
{Scope: ScopeSession, Name: TiDBSnapshot, Value: ""},
Expand All @@ -606,7 +606,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCorrelationThreshold, Value: strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: 1},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCorrelationExpFactor, Value: strconv.Itoa(DefOptCorrelationExpFactor), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCPUFactor, Value: strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 1, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCopCPUFactor, Value: strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptNetworkFactor, Value: strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptScanFactor, Value: strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
Expand Down
4 changes: 3 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ const (
// The default value is 0
TiDBAllowBatchCop = "tidb_allow_batch_cop"

// TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer.
// Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp.
TiDBAllowMPPExecution = "tidb_allow_mpp"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
Expand Down Expand Up @@ -613,7 +615,7 @@ const (
DefBroadcastJoinThresholdCount = 10 * 1024
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBAllowBatchCop = 1
DefTiDBAllowMPPExecution = true
DefTiDBAllowMPPExecution = "ON"
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func buildBatchCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tik
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{task.region, rpcCtx.Meta, task.ranges, allStores}},
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand Down