Skip to content

Commit

Permalink
config: add initChunkSize config item, make chunk start with 32 (#8480)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and jackysp committed Jan 7, 2019
1 parent 243d611 commit 084aa5a
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 29 deletions.
4 changes: 2 additions & 2 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ func (s *testSuite1) TestAggregation(c *C) {
tk.MustQuery("select 10 from idx_agg group by b").Check(testkit.Rows("10", "10"))
tk.MustQuery("select 11 from idx_agg group by a").Check(testkit.Rows("11", "11"))

tk.MustExec("set @@tidb_max_chunk_size=1;")
tk.MustExec("set @@tidb_init_chunk_size=1;")
tk.MustQuery("select group_concat(b) from idx_agg group by b;").Check(testkit.Rows("1", "2,2"))
tk.MustExec("set @@tidb_max_chunk_size=2;")
tk.MustExec("set @@tidb_init_chunk_size=2;")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int(11), b decimal(15,2))")
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
ctx: ctx,
id: id,
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
initCap: ctx.GetSessionVars().InitChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
Expand Down
8 changes: 4 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ func (s *testSuite) TestUnion(c *C) {
tk.MustExec(`insert into t1 select * from t1;`)
tk.MustExec(`insert into t1 select * from t1;`)
tk.MustExec(`insert into t2 values(1, 1);`)
tk.MustExec(`set @@tidb_max_chunk_size=2;`)
tk.MustExec(`set @@tidb_init_chunk_size=2;`)
tk.MustQuery(`select count(*) from (select t1.a, t1.b from t1 left join t2 on t1.a=t2.a union all select t1.a, t1.a from t1 left join t2 on t1.a=t2.a) tmp;`).Check(testkit.Rows("128"))
tk.MustQuery(`select tmp.a, count(*) from (select t1.a, t1.b from t1 left join t2 on t1.a=t2.a union all select t1.a, t1.a from t1 left join t2 on t1.a=t2.a) tmp;`).Check(testkit.Rows("1 128"))

Expand All @@ -1096,7 +1096,7 @@ func (s *testSuite) TestUnion(c *C) {
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int, b int)")
tk.MustExec("insert into t1 value(1,2),(1,1),(2,2),(2,2),(3,2),(3,2)")
tk.MustExec("set @@tidb_max_chunk_size=2;")
tk.MustExec("set @@tidb_init_chunk_size=2;")
tk.MustQuery("select count(*) from (select a as c, a as d from t1 union all select a, b from t1) t;").Check(testkit.Rows("12"))

// #issue 8189 and #issue 8199
Expand Down Expand Up @@ -2975,7 +2975,7 @@ func (s *testSuite) TestLimit(c *C) {
"4 4",
"5 5",
))
tk.MustExec(`set @@tidb_max_chunk_size=2;`)
tk.MustExec(`set @@tidb_init_chunk_size=2;`)
tk.MustQuery(`select * from t order by a limit 2, 1;`).Check(testkit.Rows(
"3 3",
))
Expand Down Expand Up @@ -3233,7 +3233,7 @@ func (s *testSuite3) TestMaxOneRow(c *C) {
tk.MustExec(`create table t2(a double, b double);`)
tk.MustExec(`insert into t1 values(1, 1), (2, 2), (3, 3);`)
tk.MustExec(`insert into t2 values(0, 0);`)
tk.MustExec(`set @@tidb_max_chunk_size=1;`)
tk.MustExec(`set @@tidb_init_chunk_size=1;`)
rs, err := tk.Exec(`select (select t1.a from t1 where t1.a > t2.a) as a from t2;`)
c.Assert(err, IsNil)

Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *testSuite1) TestBatchIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, key idx_a(a))")
tk.MustExec("set @@session.tidb_max_chunk_size=1")
tk.MustExec("set @@session.tidb_init_chunk_size=1")
tk.MustExec("set @@session.tidb_index_join_batch_size=1")
tk.MustExec("set @@session.tidb_index_lookup_join_concurrency=4")
tk.MustExec("begin")
Expand Down
15 changes: 8 additions & 7 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ func (s *testSuite2) TestJoinCast(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a varchar(10), index idx(a))")
tk.MustExec("insert into t values('1'), ('2'), ('3')")
tk.MustExec("set @@tidb_max_chunk_size=1")
tk.MustExec("set @@tidb_init_chunk_size=1")
result = tk.MustQuery("select a from (select /*+ TIDB_INLJ(t1, t2) */ t1.a from t t1 join t t2 on t1.a=t2.a) t group by a")
result.Sort().Check(testkit.Rows("1", "2", "3"))
tk.MustExec("set @@tidb_max_chunk_size=1024")
tk.MustExec("set @@tidb_init_chunk_size=32")
}

func (s *testSuite2) TestUsing(c *C) {
Expand Down Expand Up @@ -819,7 +819,7 @@ func (s *testSuite2) TestIssue5278(c *C) {
func (s *testSuite2) TestIndexLookupJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_max_chunk_size=2")
tk.MustExec("set @@tidb_init_chunk_size=2")
tk.MustExec("DROP TABLE IF EXISTS t")
tk.MustExec("CREATE TABLE `t` (`a` int, pk integer auto_increment,`b` char (20),primary key (pk))")
tk.MustExec("CREATE INDEX idx_t_a ON t(`a`)")
Expand Down Expand Up @@ -849,7 +849,7 @@ func (s *testSuite2) TestIndexLookupJoin(c *C) {
tk.MustExec(`drop table if exists t;`)
tk.MustExec(`create table t(a bigint, b bigint, unique key idx1(a, b));`)
tk.MustExec(`insert into t values(1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6);`)
tk.MustExec(`set @@tidb_max_chunk_size = 2;`)
tk.MustExec(`set @@tidb_init_chunk_size = 2;`)
tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t t1 left join t t2 on t1.a = t2.a and t1.b = t2.b + 4;`).Check(testkit.Rows(
`1 1 <nil> <nil>`,
`1 2 <nil> <nil>`,
Expand Down Expand Up @@ -894,7 +894,7 @@ func (s *testSuite2) TestMergejoinOrder(c *C) {
" └─TableScan_12 6666.67 cop table:t2, range:[-inf,3), (3,+inf], keep order:true, stats:pseudo",
))

tk.MustExec("set @@tidb_max_chunk_size=1")
tk.MustExec("set @@tidb_init_chunk_size=1")
tk.MustQuery("select /*+ TIDB_SMJ(t2) */ * from t1 left outer join t2 on t1.a=t2.a and t1.a!=3 order by t1.a;").Check(testkit.Rows(
"1 100 <nil> <nil>",
"2 100 <nil> <nil>",
Expand Down Expand Up @@ -942,7 +942,7 @@ func (s *testSuite2) TestHashJoin(c *C) {
tk.MustExec("insert into t1 values(1,1),(2,2),(3,3),(4,4),(5,5);")
tk.MustQuery("select count(*) from t1").Check(testkit.Rows("5"))
tk.MustQuery("select count(*) from t2").Check(testkit.Rows("0"))
tk.MustExec("set @@tidb_max_chunk_size=1;")
tk.MustExec("set @@tidb_init_chunk_size=1;")
result := tk.MustQuery("explain analyze select /*+ TIDB_HJ(t1, t2) */ * from t1 where exists (select a from t2 where t1.a = t2.a);")
// id count task operator info execution info
// HashLeftJoin_9 8000.00 root semi join, inner:TableReader_13, equal:[eq(test.t1.a, test.t2.a)] time:1.036712ms, loops:1, rows:0
Expand All @@ -953,7 +953,8 @@ func (s *testSuite2) TestHashJoin(c *C) {
row := result.Rows()
c.Assert(len(row), Equals, 5)
outerExecInfo := row[1][4].(string)
c.Assert(outerExecInfo[len(outerExecInfo)-1:], Equals, "1")
// FIXME: revert this result to 1 after TableReaderExecutor can handle initChunkSize.
c.Assert(outerExecInfo[len(outerExecInfo)-1:], Equals, "5")
innerExecInfo := row[3][4].(string)
c.Assert(innerExecInfo[len(innerExecInfo)-1:], Equals, "0")

Expand Down
1 change: 1 addition & 0 deletions executor/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (s *pkgTestSuite) TestRadixPartition(c *C) {
originL2CacheSize, originEnableRadixJoin, originMaxChunkSize := sv.L2CacheSize, sv.EnableRadixJoin, sv.MaxChunkSize
sv.L2CacheSize = 100
sv.EnableRadixJoin = true
// FIXME: use initChunkSize when join support initChunkSize.
sv.MaxChunkSize = 100
defer func() {
sv.L2CacheSize, sv.EnableRadixJoin, sv.MaxChunkSize = originL2CacheSize, originEnableRadixJoin, originMaxChunkSize
Expand Down
2 changes: 1 addition & 1 deletion executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ commit;`
create table m (id int primary key auto_increment, code int unique);
insert tmp (code) values (1);
insert tmp (code) values (1);
set tidb_max_chunk_size=1;
set tidb_init_chunk_size=1;
insert m (code) select code from tmp on duplicate key update code = values(code);`
tk.MustExec(testSQL)
testSQL = `select * from m;`
Expand Down
1 change: 1 addition & 0 deletions expression/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (h *benchHelper) init() {

h.ctx = mock.NewContext()
h.ctx.GetSessionVars().StmtCtx.TimeZone = time.Local
h.ctx.GetSessionVars().InitChunkSize = 32
h.ctx.GetSessionVars().MaxChunkSize = numRows

h.inputTypes = make([]*types.FieldType, 0, 10)
Expand Down
6 changes: 5 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,9 +825,11 @@ func (s *session) SetGlobalSysVar(name, value string) error {
if err != nil {
return errors.Trace(err)
}

sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`,
mysql.SystemDB, mysql.GlobalVariablesTable, strings.ToLower(name), sVal)
_, _, err = s.ExecRestrictedSQL(s, sql)

return errors.Trace(err)
}

Expand Down Expand Up @@ -1202,7 +1204,8 @@ func CreateSession4Test(store kv.Storage) (Session, error) {
s, err := CreateSession(store)
if err == nil {
// initialize session variables for test.
s.GetSessionVars().MaxChunkSize = 2
s.GetSessionVars().InitChunkSize = 2
s.GetSessionVars().MaxChunkSize = 32
}
return s, errors.Trace(err)
}
Expand Down Expand Up @@ -1451,6 +1454,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBDDLReorgBatchSize + quoteCommaQuote +
variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBInitChunkSize + quoteCommaQuote +
variable.TiDBMaxChunkSize + quoteCommaQuote +
variable.TiDBEnableCascadesPlanner + quoteCommaQuote +
variable.TiDBRetryLimit + quoteCommaQuote +
Expand Down
7 changes: 4 additions & 3 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,9 +1871,9 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) {
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 10)

tk.Se.GetSessionVars().DistSQLScanConcurrency = 1
tk.MustExec("set tidb_max_chunk_size = 2")
tk.MustExec("set tidb_init_chunk_size = 2")
defer func() {
tk.MustExec(fmt.Sprintf("set tidb_max_chunk_size = %d", variable.DefMaxChunkSize))
tk.MustExec(fmt.Sprintf("set tidb_init_chunk_size = %d", variable.DefInitChunkSize))
}()
rs, err := tk.Exec("select * from chk")
c.Assert(err, IsNil)
Expand All @@ -1894,7 +1894,8 @@ func (s *testSchemaSuite) TestTableReaderChunk(c *C) {
numChunks++
}
c.Assert(count, Equals, 100)
c.Assert(numChunks, Equals, 50)
// FIXME: revert this result to new group value after distsql can handle initChunkSize.
c.Assert(numChunks, Equals, 1)
rs.Close()
}

Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func NewSessionVars() *SessionVars {
vars.BatchSize = BatchSize{
IndexJoinBatchSize: DefIndexJoinBatchSize,
IndexLookupSize: DefIndexLookupSize,
InitChunkSize: DefInitChunkSize,
MaxChunkSize: DefMaxChunkSize,
DMLBatchSize: DefDMLBatchSize,
}
Expand Down Expand Up @@ -644,6 +645,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
return ErrReadOnly
case TiDBMaxChunkSize:
s.MaxChunkSize = tidbOptPositiveInt32(val, DefMaxChunkSize)
case TiDBInitChunkSize:
s.InitChunkSize = tidbOptPositiveInt32(val, DefInitChunkSize)
case TIDBMemQuotaQuery:
s.MemQuotaQuery = tidbOptInt64(val, config.GetGlobalConfig().MemQuotaQuery)
case TIDBMemQuotaHashJoin:
Expand Down Expand Up @@ -784,6 +787,9 @@ type BatchSize struct {
// IndexLookupSize is the number of handles for an index lookup task in index double read executor.
IndexLookupSize int

// InitChunkSize defines init row count of a Chunk during query execution.
InitChunkSize int

// MaxChunkSize defines max row count of a Chunk during query execution.
MaxChunkSize int
}
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
{ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)},
{ScopeGlobal | ScopeSession, TiDBEnableCascadesPlanner, "0"},
{ScopeSession, TIDBMemQuotaQuery, strconv.FormatInt(config.GetGlobalConfig().MemQuotaQuery, 10)},
{ScopeSession, TIDBMemQuotaHashJoin, strconv.FormatInt(DefTiDBMemQuotaHashJoin, 10)},
Expand Down
8 changes: 6 additions & 2 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,12 @@ const (
// when we need to keep the data output order the same as the order of index data.
TiDBIndexSerialScanConcurrency = "tidb_index_serial_scan_concurrency"

// tidb_max_chunk_capacity is used to control the max chunk size during query execution.
// TiDBMaxChunkSize is used to control the max chunk size during query execution.
TiDBMaxChunkSize = "tidb_max_chunk_size"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

// tidb_enable_cascades_planner is used to control whether to enable the cascades planner.
TiDBEnableCascadesPlanner = "tidb_enable_cascades_planner"

Expand Down Expand Up @@ -249,7 +252,8 @@ const (
DefBatchDelete = false
DefBatchCommit = false
DefCurretTS = 0
DefMaxChunkSize = 32
DefInitChunkSize = 32
DefMaxChunkSize = 1024
DefDMLBatchSize = 20000
DefMaxPreparedStmtCount = -1
DefWaitTimeout = 28800
Expand Down
30 changes: 29 additions & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ func checkInt64SystemVar(name, value string, min, max int64, vars *SessionVars)
return value, nil
}

const (
// initChunkSizeUpperBound indicates upper bound value of tidb_init_chunk_size.
initChunkSizeUpperBound = 32
// maxChunkSizeLowerBound indicates lower bound value of tidb_max_chunk_size.
maxChunkSizeLowerBound = 32
)

// ValidateSetSystemVar checks if system variable satisfies specific restriction.
func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, error) {
if strings.EqualFold(value, "DEFAULT") {
Expand Down Expand Up @@ -356,7 +363,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBHashAggFinalConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast, TiDBMaxChunkSize,
TiDBBackoffLockFast,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
if err != nil {
Expand Down Expand Up @@ -400,6 +407,27 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", ErrUnsupportedValueForVar.GenWithStackByArgs(name, value)
}
return upVal, nil
case TiDBInitChunkSize:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
}
if v <= 0 {
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
if v > initChunkSizeUpperBound {
return value, errors.Errorf("tidb_init_chunk_size(%d) cannot be bigger than %d", v, initChunkSizeUpperBound)
}
return value, nil
case TiDBMaxChunkSize:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
}
if v < maxChunkSizeLowerBound {
return value, errors.Errorf("tidb_max_chunk_size(%d) cannot be smaller than %d", v, maxChunkSizeLowerBound)
}
return value, nil
}
return value, nil
}
Expand Down
9 changes: 6 additions & 3 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
SetSessionSystemVar(v, TiDBBatchInsert, types.NewStringDatum("1"))
c.Assert(v.BatchInsert, IsTrue)

c.Assert(v.MaxChunkSize, Equals, 32)
SetSessionSystemVar(v, TiDBMaxChunkSize, types.NewStringDatum("2"))
c.Assert(v.MaxChunkSize, Equals, 2)
c.Assert(v.InitChunkSize, Equals, 32)
c.Assert(v.MaxChunkSize, Equals, 1024)
err = SetSessionSystemVar(v, TiDBMaxChunkSize, types.NewStringDatum("2"))
c.Assert(err, NotNil)
err = SetSessionSystemVar(v, TiDBInitChunkSize, types.NewStringDatum("1024"))
c.Assert(err, NotNil)

// Test case for TiDBConfig session variable.
err = SetSessionSystemVar(v, TiDBConfig, types.NewStringDatum("abc"))
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (h *Handle) Clear() {
<-h.ddlEventCh
}
h.feedback = h.feedback[:0]
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 32
h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}
h.globalMap = make(tableDeltaMap)
h.mu.rateMap = make(errorRateDeltaMap)
Expand Down
3 changes: 2 additions & 1 deletion table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,8 @@ func newCtxForPartitionExpr() sessionctx.Context {
sctx := &ctxForPartitionExpr{
sessionVars: variable.NewSessionVars(),
}
sctx.sessionVars.MaxChunkSize = 2
sctx.sessionVars.InitChunkSize = 2
sctx.sessionVars.MaxChunkSize = 32
sctx.sessionVars.StmtCtx.TimeZone = time.UTC
return sctx
}
Expand Down
3 changes: 2 additions & 1 deletion util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ func NewContext() *Context {
ctx: ctx,
cancel: cancel,
}
sctx.sessionVars.MaxChunkSize = 2
sctx.sessionVars.InitChunkSize = 2
sctx.sessionVars.MaxChunkSize = 32
sctx.sessionVars.StmtCtx.TimeZone = time.UTC
sctx.sessionVars.GlobalVarsAccessor = variable.NewMockGlobalAccessor()
return sctx
Expand Down

0 comments on commit 084aa5a

Please sign in to comment.