Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove the old distribute framework #42981

Merged
merged 15 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
"dist_backfilling.go",
"dist_owner.go",
"disttask_flow.go",
"foreign_key.go",
Expand Down Expand Up @@ -90,7 +89,6 @@ go_library(
"//parser/types",
"//privilege",
"//resourcemanager/pool/workerpool",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//sessionctx",
"//sessionctx/binloginfo",
Expand All @@ -116,8 +114,6 @@ go_library(
"//util/domainutil",
"//util/filter",
"//util/gcutil",
"//util/gpool",
"//util/gpool/spmc",
"//util/hack",
"//util/intest",
"//util/logutil",
Expand Down Expand Up @@ -224,7 +220,6 @@ go_test(
"//autoid_service",
"//config",
"//ddl/internal/callback",
"//ddl/internal/session",
"//ddl/placement",
"//ddl/schematracker",
"//ddl/testutil",
Expand Down
103 changes: 2 additions & 101 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -174,9 +173,6 @@ func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context,
type backfiller interface {
BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, err error)
AddMetricInfo(float64)
GetTasks() ([]*BackfillJob, error)
UpdateTask(bfJob *BackfillJob) error
FinishTask(bfJob *BackfillJob) error
GetCtx() *backfillCtx
String() string
}
Expand All @@ -191,7 +187,6 @@ type backfillResult struct {
}

type reorgBackfillTask struct {
bfJob *BackfillJob
physicalTable table.PhysicalTable

// TODO: Remove the following fields after remove the function of run.
Expand All @@ -205,11 +200,7 @@ type reorgBackfillTask struct {
}

func (r *reorgBackfillTask) getJobID() int64 {
jobID := r.jobID
if r.bfJob != nil {
jobID = r.bfJob.JobID
}
return jobID
return r.jobID
}

func (r *reorgBackfillTask) excludedEndKey() kv.Key {
Expand Down Expand Up @@ -258,21 +249,6 @@ func newBackfillWorker(ctx context.Context, bf backfiller) *backfillWorker {
}
}

func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey kv.Key) error {
leaseTime, err := GetOracleTime(w.GetCtx().store)
if err != nil {
return err
}
bfJob.Meta.CurrKey = nextKey
bfJob.InstanceID = execID
bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease)
return w.backfiller.UpdateTask(bfJob)
}

func (w *backfillWorker) finishJob(bfJob *BackfillJob) error {
return w.backfiller.FinishTask(bfJob)
}

func (w *backfillWorker) String() string {
return fmt.Sprintf("backfill-worker %d, tp %s", w.GetCtx().id, w.backfiller.String())
}
Expand Down Expand Up @@ -302,24 +278,18 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
addedCount: 0,
nextKey: handleRange.startKey,
}
batchStartTime := time.Now()
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
jobID := task.getJobID()
rc := d.getReorgCtx(jobID)

isDistReorg := task.bfJob != nil
if isDistReorg {
w.initPartitionIndexInfo(task)
jobID = genBackfillJobReorgCtxID(jobID)
}
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillData we will never cancel the job.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, isDistReorg)
err := d.isReorgRunnable(jobID, false)
if err != nil {
result.err = err
return result
Expand Down Expand Up @@ -358,20 +328,6 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
if taskCtx.done {
break
}

if isDistReorg {
// TODO: Adjust the updating lease frequency by batch processing time carefully.
if time.Since(batchStartTime) < updateInstanceLease {
continue
}
batchStartTime = time.Now()
if err := w.updateLease(w.GetCtx().uuid, task.bfJob, result.nextKey); err != nil {
logutil.BgLogger().Info("[ddl] backfill worker handle task, update lease failed", zap.Stringer("worker", w),
zap.Stringer("task", task), zap.String("backfill job", task.bfJob.AbbrStr()), zap.Error(err))
result.err = err
return result
}
}
}
logutil.BgLogger().Info("[ddl] backfill worker finish task",
zap.Stringer("worker", w), zap.Stringer("task", task),
Expand All @@ -385,61 +341,6 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
if w, ok := w.backfiller.(*addIndexTxnWorker); ok {
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
}
}

func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String()))
defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() {
result = &backfillResult{taskID: task.id, err: dbterror.ErrReorgPanic}
}, false)
defer w.GetCtx().setDDLLabelForTopSQL(task.jobID, task.sqlQuery)

failpoint.Inject("mockBackfillRunErr", func() {
if w.GetCtx().id == 0 {
result := &backfillResult{taskID: task.id, addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")}
failpoint.Return(result)
}
})
failpoint.Inject("mockHighLoadForAddIndex", func() {
sqlPrefixes := []string{"alter"}
topsql.MockHighCPULoad(task.sqlQuery, sqlPrefixes, 5)
})
failpoint.Inject("mockBackfillSlow", func() {
time.Sleep(100 * time.Millisecond)
})

// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
task.bfJob.Meta.RowCount = int64(result.addedCount)
if result.err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker runTask failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err))
if dbterror.ErrDDLJobNotFound.Equal(result.err) {
result.err = nil
return result
}
task.bfJob.State = model.JobStateCancelled
task.bfJob.Meta.Error = toTError(result.err)
if err := w.finishJob(task.bfJob); err != nil {
logutil.BgLogger().Info("[ddl] backfill worker runTask, finishJob failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(err))
result.err = err
}
} else {
task.bfJob.State = model.JobStateDone
result.err = w.finishJob(task.bfJob)
}
return result
}

func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w))
var curTaskID int
Expand Down
12 changes: 0 additions & 12 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,18 +1214,6 @@ func (*updateColumnWorker) String() string {
return typeUpdateColumnWorker.String()
}

func (*updateColumnWorker) GetTasks() ([]*BackfillJob, error) {
panic("[ddl] update column worker GetTask function doesn't implement")
}

func (*updateColumnWorker) UpdateTask(*BackfillJob) error {
panic("[ddl] update column worker UpdateTask function doesn't implement")
}

func (*updateColumnWorker) FinishTask(*BackfillJob) error {
panic("[ddl] update column worker FinishTask function doesn't implement")
}

func (w *updateColumnWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}
Expand Down
1 change: 0 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ const (
)

const defaultBatchSize = 1024
const defaultReorgBatchSize = 256

const dbTestLease = 600 * time.Millisecond

Expand Down
49 changes: 2 additions & 47 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -63,7 +62,6 @@ import (
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/gpool/spmc"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/syncutil"
Expand All @@ -87,9 +85,8 @@ const (

batchAddingJobs = 10

reorgWorkerCnt = 10
generalWorkerCnt = 1
backfillWorkerCnt = 32
reorgWorkerCnt = 10
generalWorkerCnt = 1

// checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list.
checkFlagIndexInJobArgs = 1
Expand Down Expand Up @@ -280,8 +277,6 @@ type ddl struct {
// used in the concurrency ddl.
reorgWorkerPool *workerPool
generalDDLWorkerPool *workerPool
backfillCtxPool *backfillCtxPool
backfillWorkerPool *spmc.Pool[*reorgBackfillTask, *backfillResult, int, *backfillWorker, *backfillWorkerContext]
// get notification if any DDL coming.
ddlJobCh chan struct{}
}
Expand Down Expand Up @@ -512,13 +507,6 @@ type reorgContexts struct {
reorgCtxMap map[int64]*reorgCtx
}

func getReorgCtx(reorgCtxs *reorgContexts, jobID int64) *reorgCtx {
reorgCtxs.RLock()
defer reorgCtxs.RUnlock()
return reorgCtxs.reorgCtxMap[jobID]
}

// TODO: Using getReorgCtx instead of dc.getReorgCtx.
func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
Expand All @@ -544,10 +532,6 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return rc
}

func genBackfillJobReorgCtxID(jobID int64) int64 {
return -jobID
}

func (dc *ddlCtx) removeReorgCtx(jobID int64) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
Expand Down Expand Up @@ -753,24 +737,6 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
d.wg.Run(d.startDispatchLoop)
}

func (d *ddl) prepareBackfillWorkers() error {
workerFactory := func() (pools.Resource, error) {
bk := newBackfillWorker(context.Background(), nil)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_backfill_worker", metrics.CreateDDL)).Inc()
return bk, nil
}
d.backfillCtxPool = newBackfillContextPool(pools.NewResourcePool(workerFactory, backfillWorkerCnt, backfillWorkerCnt, 0))
var err error
d.backfillWorkerPool, err = spmc.NewSPMCPool[*reorgBackfillTask, *backfillResult, int, *backfillWorker,
*backfillWorkerContext]("backfill", int32(backfillWorkerCnt), rmutil.DDL)
if err != nil {
return err
}
d.backfillJobCh = make(chan struct{}, 1)
d.wg.Run(d.startDispatchBackfillJobsLoop)
return nil
}

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))
Expand Down Expand Up @@ -800,11 +766,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err := d.EnableDDL(); err != nil {
return err
}

// TODO: Currently, it is only processed during initialization and is expected to be added to EnableDDL later.
if err := d.prepareBackfillWorkers(); err != nil {
return err
}
}

variable.RegisterStatistics(d)
Expand Down Expand Up @@ -877,12 +838,6 @@ func (d *ddl) close() {
if d.generalDDLWorkerPool != nil {
d.generalDDLWorkerPool.close()
}
if d.backfillCtxPool != nil {
d.backfillCtxPool.close()
}
if d.backfillWorkerPool != nil {
d.backfillWorkerPool.ReleaseAndWait()
}

// d.delRangeMgr using sessions from d.sessPool.
// Put it before d.sessPool.close to reduce the time spent by d.sessPool.close.
Expand Down
23 changes: 0 additions & 23 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,34 +79,11 @@ func (d *ddl) RemoveReorgCtx(id int64) {
// JobNeedGCForTest is only used for test.
var JobNeedGCForTest = jobNeedGC

// GetJobWithoutPartition is only used for test.
const GetJobWithoutPartition = getJobWithoutPartition

// BackfillJobPrefixKeyString is only used for test.
func BackfillJobPrefixKeyString(ddlJobID int64, eleKey kv.Key, eleID int64) string {
return backfillJobPrefixKeyString(ddlJobID, eleKey, eleID)
}

// GetDDLCtx returns ddlCtx for test.
func GetDDLCtx(d DDL) *ddlCtx {
return d.(*ddl).ddlCtx
}

// GetMaxRowID is used for test.
func GetMaxRowID(store kv.Storage, priority int, t table.Table, startHandle, endHandle kv.Key) (kv.Key, error) {
return getRangeEndKey(NewJobContext(), store, priority, t.RecordPrefix(), startHandle, endHandle)
}

func testNewDDLAndStart(ctx context.Context, options ...Option) (*ddl, error) {
// init infoCache and a stub infoSchema
ic := infoschema.NewCache(2)
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0)
options = append(options, WithInfoCache(ic))
d := newDDL(ctx, options...)
err := d.Start(nil)
return d, err
}

func createMockStore(t *testing.T) kv.Storage {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ var (
// PollTiFlashBackoffMinTick is the min tick before we try to update TiFlash replica availability for one table.
PollTiFlashBackoffMinTick TiFlashTick = 1
// PollTiFlashBackoffCapacity is the cache size of backoff struct.
PollTiFlashBackoffCapacity int = 1000
PollTiFlashBackoffCapacity = 1000
// PollTiFlashBackoffRate is growth rate of exponential backoff threshold.
PollTiFlashBackoffRate TiFlashTick = 1.5
// RefreshProgressMaxTableCount is the max count of table to refresh progress after available each poll.
Expand Down
Loading