Skip to content

Commit

Permalink
variable, ddl: add session variable 'tidb_ddl_reorg_priority' to set …
Browse files Browse the repository at this point in the history
…operation priority of ddl reorg (pingcap#7116)
  • Loading branch information
winkyao authored Aug 8, 2018
1 parent af7fed9 commit 359df6e
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 9 deletions.
1 change: 1 addition & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, ind
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{unique, indexName, idxColNames, indexOption},
Priority: ctx.GetSessionVars().DDLReorgPriority,
}

err = d.doDDLJob(ctx, job)
Expand Down
14 changes: 9 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ type addIndexWorker struct {
table table.Table
colFieldMap map[int64]*types.FieldType
closed bool
priority int

// The following attributes are used to reduce memory allocation.
defaultVals []types.Datum
Expand Down Expand Up @@ -507,6 +508,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
index: index,
table: t,
colFieldMap: colFieldMap,
priority: kv.PriorityLow,
defaultVals: make([]types.Datum, len(t.Cols())),
rowMap: make(map[int64]types.Datum, len(colFieldMap)),
}
Expand Down Expand Up @@ -588,7 +590,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.priority, w.table, txn.StartTS(), taskRange.startHandle,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -710,7 +712,8 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan
errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error {
addedCount = 0
scanCount = 0
txn.SetOption(kv.Priority, kv.PriorityLow)
txn.SetOption(kv.Priority, w.priority)

var (
idxRecords []*indexRecord
err error
Expand Down Expand Up @@ -1058,6 +1061,7 @@ func (w *worker) addPhysicalTableIndex(t table.Table, indexInfo *model.IndexInfo
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, colFieldMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
Expand Down Expand Up @@ -1110,7 +1114,7 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
return true, nil
}

start, end, err := getTableRange(reorg.d, t.GetPartition(pid), reorg.Job.SnapshotVer)
start, end, err := getTableRange(reorg.d, t.GetPartition(pid), reorg.Job.SnapshotVer, reorg.Job.Priority)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -1155,11 +1159,11 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, priority int, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(kv.PriorityLow)
snap.SetPriority(priority)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.Table) (maxRowID int
var gofailOnceGuard bool

// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(d *ddlCtx, tbl table.Table, snapshotVer uint64) (startHandle, endHandle int64, err error) {
func getTableRange(d *ddlCtx, tbl table.Table, snapshotVer uint64, priority int) (startHandle, endHandle int64, err error) {
startHandle = math.MinInt64
endHandle = math.MaxInt64
// Get the start handle of this partition.
err = iterateSnapshotRows(d.store, tbl, snapshotVer, math.MinInt64,
err = iterateSnapshotRows(d.store, priority, tbl, snapshotVer, math.MinInt64,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
startHandle = h
return false, nil
Expand Down Expand Up @@ -351,7 +351,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table) (*re
pid = pi.Definitions[0].ID
tp = tbl.(table.PartitionedTable).GetPartition(pid)
}
start, end, err = getTableRange(d, tp, ver.Ver)
start, end, err = getTableRange(d, tp, ver.Ver, job.Priority)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
28 changes: 28 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,34 @@ func (s *testContextOptionSuite) TestAddIndexPriority(c *C) {
cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()

tk.MustExec("alter table t1 drop index t1_index;")
tk.MustExec("SET SESSION tidb_ddl_reorg_priority = 'PRIORITY_NORMAL'")

cli.mu.Lock()
cli.mu.checkFlags = checkDDLAddIndexPriority
cli.mu.Unlock()

cli.priority = pb.CommandPri_Normal
tk.MustExec("alter table t1 add index t1_index (id);")

cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()

tk.MustExec("alter table t1 drop index t1_index;")
tk.MustExec("SET SESSION tidb_ddl_reorg_priority = 'PRIORITY_HIGH'")

cli.mu.Lock()
cli.mu.checkFlags = checkDDLAddIndexPriority
cli.mu.Unlock()

cli.priority = pb.CommandPri_High
tk.MustExec("alter table t1 add index t1_index (id);")

cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()
}

func (s *testContextOptionSuite) TestAlterTableComment(c *C) {
Expand Down
10 changes: 9 additions & 1 deletion meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,16 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {
return nil, errors.Trace(err)
}

job := &model.Job{}
job := &model.Job{
// For compability, if the job is enqueued by old version TiDB and Priority field is omitted,
// set the default priority to kv.PriorityLow.
Priority: kv.PriorityLow,
}
err = job.Decode(value)
// Check if the job.Priority is valid.
if job.Priority < kv.PriorityNormal || job.Priority > kv.PriorityHigh {
job.Priority = kv.PriorityLow
}
return job, errors.Trace(err)
}

Expand Down
3 changes: 3 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ type Job struct {
// ReorgMeta is meta info of ddl reorganization.
// This field is depreciated.
ReorgMeta *DDLReorgMeta `json:"reorg_meta"`

// Priority is only used to set the operation priority of adding indices.
Priority int `json:"priority"`
}

// FinishTableJob is called when a job is finished.
Expand Down
21 changes: 21 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package variable

import (
"crypto/tls"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -284,6 +285,9 @@ type SessionVars struct {
// EnableTablePartition enables table partition feature.
EnableTablePartition bool

// DDLReorgPriority is the operation priority of adding indices.
DDLReorgPriority int

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool
Expand All @@ -309,6 +313,7 @@ func NewSessionVars() *SessionVars {
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -453,6 +458,20 @@ func (s *SessionVars) deleteSystemVar(name string) error {
return nil
}

func (s *SessionVars) setDDLReorgPriority(val string) {
val = strings.ToLower(val)
switch val {
case "priority_low":
s.DDLReorgPriority = kv.PriorityLow
case "priority_normal":
s.DDLReorgPriority = kv.PriorityNormal
case "priority_high":
s.DDLReorgPriority = kv.PriorityHigh
default:
s.DDLReorgPriority = kv.PriorityLow
}
}

// SetSystemVar sets the value of a system variable.
func (s *SessionVars) SetSystemVar(name string, val string) error {
switch name {
Expand Down Expand Up @@ -554,6 +573,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.EnableTablePartition = TiDBOptOn(val)
case TiDBDDLReorgWorkerCount:
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgPriority:
s.setDDLReorgPriority(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBGeneralLog, strconv.Itoa(DefTiDBGeneralLog)},
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ const (

// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"

// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"
)

// Default TiDB system variable values.
Expand Down

0 comments on commit 359df6e

Please sign in to comment.