Skip to content

Commit

Permalink
planner: support set tidb_allow_mpp to 2 or ENFORCE to enforce us…
Browse files Browse the repository at this point in the history
…e mpp mode. (pingcap#24516)
  • Loading branch information
LittleFall committed Jul 20, 2021
1 parent ed5bb84 commit c255971
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 45 deletions.
2 changes: 1 addition & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution {
if !ctx.GetSessionVars().IsMPPAllowed() {
return false
}
_, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender)
Expand Down
6 changes: 3 additions & 3 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
}
joins := make([]PhysicalPlan, 0, 8)
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash {
if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down Expand Up @@ -2028,7 +2028,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl
if !lt.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if lt.ctx.GetSessionVars().AllowMPPExecution {
if lt.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
Expand Down Expand Up @@ -2437,7 +2437,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash
canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
Expand Down
126 changes: 126 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3601,3 +3601,129 @@ func (s *testIntegrationSuite) TestIssue23839(c *C) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=2000001")
tk.Exec("explain SELECT OUTR . col2 AS X FROM (SELECT INNR . col1 as col1, SUM( INNR . col2 ) as col2 FROM (SELECT INNR . `col_int_not_null` + 1 as col1, INNR . `pk` as col2 FROM BB AS INNR) AS INNR GROUP BY col1) AS OUTR2 INNER JOIN (SELECT INNR . col1 as col1, MAX( INNR . col2 ) as col2 FROM (SELECT INNR . `col_int_not_null` + 1 as col1, INNR . `pk` as col2 FROM BB AS INNR) AS INNR GROUP BY col1) AS OUTR ON OUTR2.col1 = OUTR.col1 GROUP BY OUTR . col1, OUTR2 . col1 HAVING X <> 'b'")
}

func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) {
tk := testkit.NewTestKit(c, s.store)

// test value limit of tidb_opt_tiflash_concurrency_factor
err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`)

tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1"))
tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24"))

// test set tidb_allow_mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@session.tidb_allow_mpp = off")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = oN")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = enForcE")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@global.tidb_allow_mpp = faLsE")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@global.tidb_allow_mpp = True")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON"))

err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`)

// test query
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("create index idx on t(a)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// ban mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))

// read from tiflash, batch cop.
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"StreamAgg_20 1.00 285050.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_21 1.00 19003.88 root data:StreamAgg_9",
" └─StreamAgg_9 1.00 19006.88 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_19 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_18 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// open mpp
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))

// should use tikv to index read
tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_30 1.00 485.00 root funcs:count(Column#6)->Column#3",
"└─IndexReader_31 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_29 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash, mpp with large cost
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 11910.68 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 11877.08 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// enforce mpp
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

// should use mpp
tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows(
"HashAgg_24 1.00 33.60 root funcs:count(Column#5)->Column#3",
"└─TableReader_26 1.00 0.00 root data:ExchangeSender_25",
" └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 32.88 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash, mpp with little cost
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 33.60 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 0.00 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))
}
8 changes: 6 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2053,11 +2053,15 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
StoreType: kv.TiFlash,
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()

p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
if p.ctx.GetSessionVars().IsMPPEnforced() {
p.cost = 0
}
rt := &rootTask{
p: p,
cst: t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor,
cst: p.cost,
}
p.cost = rt.cost()
return rt
}

Expand Down
19 changes: 15 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,12 @@ type SessionVars struct {
AllowWriteRowID bool

// AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
// Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop.
AllowBatchCop int

// AllowMPPExecution will prefer using mpp way to execute a query.
AllowMPPExecution bool
// AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer.
// Value set to "ENFORCE" means to use mpp whenever possible. Value set to means never use mpp.
allowMPPExecution string

// TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed.
AllowAutoRandExplicitInsert bool
Expand Down Expand Up @@ -850,6 +851,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 {
return 1
}

// IsMPPAllowed returns whether mpp execution is allowed.
func (s *SessionVars) IsMPPAllowed() bool {
return s.allowMPPExecution != "OFF"
}

// IsMPPEnforced returns whether mpp execution is enforced.
func (s *SessionVars) IsMPPEnforced() bool {
return s.allowMPPExecution == "ENFORCE"
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
func (s *SessionVars) CheckAndGetTxnScope() string {
if s.InRestrictedSQL {
Expand Down Expand Up @@ -1077,7 +1088,7 @@ func NewSessionVars() *SessionVars {
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.AllowMPPExecution = DefTiDBAllowMPPExecution
vars.allowMPPExecution = DefTiDBAllowMPPExecution

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down
Loading

0 comments on commit c255971

Please sign in to comment.