Skip to content

Commit

Permalink
Merge branch 'release-3.0' into idx-len-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Jul 15, 2019
2 parents d157290 + 3ad5ff6 commit 4b42508
Show file tree
Hide file tree
Showing 48 changed files with 1,019 additions and 302 deletions.
29 changes: 17 additions & 12 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
field_types "github.com/pingcap/parser/types"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1300,25 +1301,29 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

err = d.doDDLJob(ctx, job)
if err == nil {
var preSplitAndScatter func()
// do pre-split and scatter.
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
preSplitAndScatter = func() {
preSplitTableShardRowIDBitsRegion(d.store, tbInfo, ctx.GetSessionVars().WaitSplitRegionFinish)
sp, ok := d.store.(kv.SplitableStore)
if ok && atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
var (
preSplit func()
scatterRegion bool
)
val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBScatterRegion)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] won't scatter region", zap.Error(err))
} else {
scatterRegion = variable.TiDBOptOn(val)
}
} else if atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
pi := tbInfo.GetPartitionInfo()
if pi != nil {
preSplitAndScatter = func() { splitPartitionTableRegion(d.store, pi) }
preSplit = func() { splitPartitionTableRegion(sp, pi, scatterRegion) }
} else {
preSplitAndScatter = func() { splitTableRegion(d.store, tbInfo.ID) }
preSplit = func() { splitTableRegion(sp, tbInfo, scatterRegion) }
}
}
if preSplitAndScatter != nil {
if ctx.GetSessionVars().WaitSplitRegionFinish {
preSplitAndScatter()
if scatterRegion {
preSplit()
} else {
go preSplitAndScatter()
go preSplit()
}
}

Expand Down
129 changes: 129 additions & 0 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func splitPartitionTableRegion(store kv.SplitableStore, pi *model.PartitionInfo, scatter bool) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
regionIDs := make([]uint64, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
regionIDs = append(regionIDs, splitRecordRegion(store, def.ID, scatter))
}
if scatter {
waitScatterRegionFinish(store, regionIDs...)
}
}

func splitTableRegion(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
splitPreSplitedTable(store, tbInfo, scatter)
} else {
regionID := splitRecordRegion(store, tbInfo.ID, scatter)
if scatter {
waitScatterRegionFinish(store, regionID)
}
}
}

func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scatter bool) {
// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions-1)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tbInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID),
zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
waitScatterRegionFinish(store, regionIDs...)
}
}

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
}
return regionIDs
}

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}
95 changes: 0 additions & 95 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down Expand Up @@ -340,99 +338,6 @@ func checkSafePoint(w *worker, snapshotTS uint64) error {
return gcutil.ValidateSnapshot(ctx, snapshotTS)
}

type splitableStore interface {
SplitRegion(splitKey kv.Key) error
SplitRegionAndScatter(splitKey kv.Key) (uint64, error)
WaitScatterRegionFinish(regionID uint64) error
}

func splitPartitionTableRegion(store kv.Storage, pi *model.PartitionInfo) {
// Max partition count is 4096, should we sample and just choose some of the partition to split?
for _, def := range pi.Definitions {
splitTableRegion(store, def.ID)
}
}

func splitTableRegion(store kv.Storage, tableID int64) {
s, ok := store.(splitableStore)
if !ok {
return
}
tableStartKey := tablecodec.GenTablePrefix(tableID)
if err := s.SplitRegion(tableStartKey); err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(ddlLogCtx).Warn("[ddl] split table region failed", zap.Error(err))
}
}

func preSplitTableShardRowIDBitsRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
s, ok := store.(splitableStore)
if !ok {
return
}
regionIDs := make([]uint64, 0, 1<<(tblInfo.PreSplitRegions-1)+len(tblInfo.Indices))

// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tblInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tblInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tblInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := s.SplitRegionAndScatter(key)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}

// Split index region.
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := s.SplitRegionAndScatter(indexPrefix)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table index region failed", zap.String("index", idx.Name.L), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
if !waitTableSplitFinish {
return
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}

func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) {
alloc := autoid.NewAllocator(store, tblInfo.GetDBID(schemaID), tblInfo.IsAutoIncColUnsigned())
tbl, err := table.TableFromMeta(alloc, tblInfo)
Expand Down
19 changes: 17 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

// processinfoSetter is the interface use to set current running process info.
type processinfoSetter interface {
SetProcessInfo(string, time.Time, byte)
SetProcessInfo(string, time.Time, byte, uint64)
}

// recordSet wraps an executor, implements sqlexec.RecordSet interface
Expand Down Expand Up @@ -245,8 +245,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
sql = ss.SecureText()
}
}
maxExecutionTime := getMaxExecutionTime(sctx, a.StmtNode)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd)
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

Expand Down Expand Up @@ -285,6 +286,20 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}, nil
}

// getMaxExecutionTime get the max execution timeout value.
func getMaxExecutionTime(sctx sessionctx.Context, stmtNode ast.StmtNode) uint64 {
ret := sctx.GetSessionVars().MaxExecutionTime
if sel, ok := stmtNode.(*ast.SelectStmt); ok {
for _, hint := range sel.TableHints {
if hint.HintName.L == variable.MaxExecutionTime {
ret = hint.MaxExecutionTime
break
}
}
}
return ret
}

type chunkRowRecordSet struct {
rows []chunk.Row
idx int
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func (b *executorBuilder) buildShow(v *plannercore.Show) Executor {
DBName: model.NewCIStr(v.DBName),
Table: v.Table,
Column: v.Column,
IndexName: v.IndexName,
User: v.User,
Roles: v.Roles,
IfNotExists: v.IfNotExists,
Expand Down
Loading

0 comments on commit 4b42508

Please sign in to comment.