Skip to content

Commit

Permalink
ddl: make concurrent ingest job queue instead of fallback (pingcap#48645
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 18, 2023
1 parent b14df20 commit 3ac58a9
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 102 deletions.
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ Unsupported clustered primary key type FLOAT/DOUBLE for TTL

["ddl:8200"]
error = '''
Unsupported shard_row_id_bits for table with primary key as row id
Unsupported tidb_enable_dist_task setting. To utilize distributed task execution, please enable tidb_ddl_enable_fast_reorg first.
'''

["ddl:8201"]
Expand Down
12 changes: 4 additions & 8 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -52,39 +51,36 @@ func TestDoneTaskKeeper(t *testing.T) {
func TestPickBackfillType(t *testing.T) {
originMgr := ingest.LitBackCtxMgr
originInit := ingest.LitInitialized
originFastReorg := variable.EnableFastReorg.Load()
defer func() {
ingest.LitBackCtxMgr = originMgr
ingest.LitInitialized = originInit
variable.EnableFastReorg.Store(originFastReorg)
}()
mockMgr := ingest.NewMockBackendCtxMgr(
func() sessionctx.Context {
return nil
})
ingest.LitBackCtxMgr = mockMgr
mockCtx := context.Background()
const uk = false
mockJob := &model.Job{
ID: 1,
ReorgMeta: &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeTxn,
},
}
variable.EnableFastReorg.Store(true)
tp, err := pickBackfillType(mockCtx, mockJob, uk, nil)
mockJob.ReorgMeta.IsFastReorg = true
tp, err := pickBackfillType(mockCtx, mockJob)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxn)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = false
tp, err = pickBackfillType(mockCtx, mockJob, uk, nil)
tp, err = pickBackfillType(mockCtx, mockJob)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxnMerge)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = true
tp, err = pickBackfillType(mockCtx, mockJob, uk, nil)
tp, err = pickBackfillType(mockCtx, mockJob)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeLitMerge)
}
17 changes: 0 additions & 17 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,23 +499,6 @@ func (dc *ddlCtx) jobContext(jobID int64, reorgMeta *model.DDLReorgMeta) *JobCon
return ctx
}

func (dc *ddlCtx) removeBackfillCtxJobCtx(jobID int64) {
dc.backfillCtx.Lock()
delete(dc.backfillCtx.jobCtxMap, jobID)
dc.backfillCtx.Unlock()
}

func (dc *ddlCtx) backfillCtxJobIDs() []int64 {
dc.backfillCtx.Lock()
defer dc.backfillCtx.Unlock()

runningJobIDs := make([]int64, 0, len(dc.backfillCtx.jobCtxMap))
for id := range dc.backfillCtx.jobCtxMap {
runningJobIDs = append(runningJobIDs, id)
}
return runningJobIDs
}

type reorgContexts struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
Expand Down
32 changes: 30 additions & 2 deletions pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7077,10 +7077,15 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
TableName: t.Meta().Name.L,
Type: model.ActionAddPrimaryKey,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
ReorgMeta: nil,
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
}
reorgMeta, err := newReorgMetaFromVariables(d, job, ctx)
if err != nil {
return err
}
job.ReorgMeta = reorgMeta

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down Expand Up @@ -7327,12 +7332,17 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
TableName: t.Meta().Name.L,
Type: model.ActionAddIndex,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
ReorgMeta: nil,
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: chs,
Collate: coll,
}
reorgMeta, err := newReorgMetaFromVariables(d, job, ctx)
if err != nil {
return err
}
job.ReorgMeta = reorgMeta

err = d.DoDDLJob(ctx, job)
// key exists, but if_not_exists flags is true, so we ignore this error.
Expand All @@ -7344,6 +7354,24 @@ func (d *ddl) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
return errors.Trace(err)
}

func newReorgMetaFromVariables(d *ddl, job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) {
reorgMeta := NewDDLReorgMeta(sctx)
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
}
isUpgradingSysDB := d.stateSyncer.IsUpgradingState() && hasSysDB(job)
if isUpgradingSysDB {
if reorgMeta.IsDistReorg {
logutil.BgLogger().Info("cannot use distributed task execution because the job on system DB is in upgrade state",
zap.String("category", "ddl"), zap.Stringer("job", job))
}
reorgMeta.IsDistReorg = false
}
return reorgMeta, nil
}

func buildFKInfo(fkName model.CIStr, keys []*ast.IndexPartSpecification, refer *ast.ReferenceDef, cols []*table.Column) (*model.FKInfo, error) {
if len(keys) != len(refer.IndexPartSpecifications) {
return nil, infoschema.ErrForeignKeyNotMatch.GenWithStackByArgs(fkName, "Key reference and table reference don't match")
Expand Down
11 changes: 11 additions & 0 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -574,6 +575,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
markJobFinish(job)
}()

if jobNeedGC(job) {
Expand Down Expand Up @@ -619,6 +621,15 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func markJobFinish(job *model.Job) {
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down
30 changes: 4 additions & 26 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ SwitchIndexState:
case model.StateNone:
// none -> delete only
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, allIndexInfos[0].Unique, d)
reorgTp, err = pickBackfillType(w.ctx, job)
if err != nil {
if !errorIsRetryable(err, job) {
job.State = model.JobStateCancelled
Expand Down Expand Up @@ -749,13 +749,13 @@ SwitchIndexState:
}

// pickBackfillType determines which backfill process will be used.
func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCtx) (model.ReorgType, error) {
func pickBackfillType(ctx context.Context, job *model.Job) (model.ReorgType, error) {
if job.ReorgMeta.ReorgTp != model.ReorgTypeNone {
// The backfill task has been started.
// Don't change the backfill type.
return job.ReorgMeta.ReorgTp, nil
}
if !IsEnableFastReorg() {
if !job.ReorgMeta.IsFastReorg {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
return model.ReorgTypeTxn, nil
}
Expand All @@ -770,29 +770,7 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt
if err != nil {
return model.ReorgTypeNone, err
}
var pdLeaderAddr string
var isUpgradingSysDB bool
if d != nil {
//nolint:forcetypeassert
pdLeaderAddr = d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
isUpgradingSysDB = d.stateSyncer.IsUpgradingState() && hasSysDB(job)
}
useDistReorg := false
if variable.EnableDistTask.Load() && !isUpgradingSysDB {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
useDistReorg = true
} else {
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
if isUpgradingSysDB {
logutil.BgLogger().Info("pick backfill type, cannot be a dist task because the job on the system DB in the upgrade state",
zap.String("category", "ddl"), zap.Stringer("job", job))
}
}
if err != nil {
return model.ReorgTypeNone, err
}
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
job.ReorgMeta.IsDistReorg = useDistReorg
return model.ReorgTypeLitMerge, nil
}
}
Expand Down Expand Up @@ -897,7 +875,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, allIndexInfos[0].Unique, d)
reorgTp, err = pickBackfillType(w.ctx, job)
if err != nil {
return false, ver, err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
Expand Down Expand Up @@ -232,7 +233,7 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 6, 6, true)
if ddl.IsEnableFastReorg() {
if variable.EnableFastReorg.Load() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 6, 6, true)
}
Expand All @@ -255,14 +256,14 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 5, 7, true)
if ddl.IsEnableFastReorg() {
if variable.EnableFastReorg.Load() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 5, 7, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
if ddl.IsEnableFastReorg() {
if variable.EnableFastReorg.Load() {
err = checkIndexExists(ctx, writeTbl, 7, 7, false)
} else {
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
Expand Down Expand Up @@ -296,7 +297,7 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
idxVal := row[1].GetInt64()
handle := row[0].GetInt64()
err = checkIndexExists(ctx, publicTbl, idxVal, handle, true)
if ddl.IsEnableFastReorg() {
if variable.EnableFastReorg.Load() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, idxVal, handle, true)
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -33,11 +32,6 @@ import (
"go.uber.org/zap"
)

// IsEnableFastReorg check whether Fast Reorg is allowed.
func IsEnableFastReorg() bool {
return variable.EnableFastReorg.Load()
}

func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
txn kv.Transaction,
idxInfo *model.IndexInfo,
Expand Down
42 changes: 33 additions & 9 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
Expand All @@ -36,12 +37,18 @@ type BackendCtxMgr interface {
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)

MarkJobProcessing(jobID int64) (ok bool)
MarkJobFinish()
}

type litBackendCtxMgr struct {
generic.SyncMap[int64, *litBackendCtx]
memRoot MemRoot
diskRoot DiskRoot
memRoot MemRoot
diskRoot DiskRoot
processingJobID int64
lastLoggingTime time.Time
mu sync.Mutex
}

func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
Expand All @@ -62,15 +69,32 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
return mgr
}

// MarkJobProcessing marks ingest backfill is processing.
func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.processingJobID == 0 || m.processingJobID == jobID {
m.processingJobID = jobID
return true
}
if time.Since(m.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", m.processingJobID))
m.lastLoggingTime = time.Now()
}
return false
}

// MarkJobFinish marks ingest backfill is finished.
func (m *litBackendCtxMgr) MarkJobFinish() {
m.mu.Lock()
m.processingJobID = 0
m.mu.Unlock()
}

// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
activeJobIDs := m.Keys()
if len(activeJobIDs) > 0 {
logutil.BgLogger().Info("ingest backfill is already in use by another DDL job", zap.String("category", "ddl-ingest"),
zap.Int64("job ID", activeJobIDs[0]))
return false, nil
}
if err := m.diskRoot.PreCheckUsage(); err != nil {
logutil.BgLogger().Info("ingest backfill is not available", zap.String("category", "ddl-ingest"), zap.Error(err))
return false, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ func (d *diskRootImpl) PreCheckUsage() error {
}
if RiskOfDiskFull(sz.Available, sz.Capacity) {
sortPath := ConfigSortPath()
msg := fmt.Sprintf("sort path: %s, %s, please clean up the disk and retry", sortPath, d.UsageInfo())
logutil.BgLogger().Warn("available disk space is less than 10%, cannot use ingest mode",
zap.String("sort path", sortPath),
zap.String("usage", d.usageInfo()))
msg := fmt.Sprintf("no enough space in %s", sortPath)
return dbterror.ErrIngestCheckEnvFailed.FastGenByArgs(msg)
}
return nil
Expand Down
Loading

0 comments on commit 3ac58a9

Please sign in to comment.