Skip to content

Commit

Permalink
*: remove the old distribute framework (#42981)
Browse files Browse the repository at this point in the history
close #43016
  • Loading branch information
wjhuang2016 authored Apr 13, 2023
1 parent 4f9afd4 commit 5075e5f
Show file tree
Hide file tree
Showing 22 changed files with 31 additions and 2,640 deletions.
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,6 @@ bazel_coverage_test: check-bazel-prepare failpoint-enable bazel_ci_prepare
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,disttask \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
5 changes: 0 additions & 5 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
"dist_backfilling.go",
"dist_owner.go",
"disttask_flow.go",
"foreign_key.go",
Expand Down Expand Up @@ -90,7 +89,6 @@ go_library(
"//parser/types",
"//privilege",
"//resourcemanager/pool/workerpool",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//sessionctx",
"//sessionctx/binloginfo",
Expand All @@ -116,8 +114,6 @@ go_library(
"//util/domainutil",
"//util/filter",
"//util/gcutil",
"//util/gpool",
"//util/gpool/spmc",
"//util/hack",
"//util/intest",
"//util/logutil",
Expand Down Expand Up @@ -224,7 +220,6 @@ go_test(
"//autoid_service",
"//config",
"//ddl/internal/callback",
"//ddl/internal/session",
"//ddl/placement",
"//ddl/schematracker",
"//ddl/testutil",
Expand Down
103 changes: 2 additions & 101 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -174,9 +173,6 @@ func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context,
type backfiller interface {
BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, err error)
AddMetricInfo(float64)
GetTasks() ([]*BackfillJob, error)
UpdateTask(bfJob *BackfillJob) error
FinishTask(bfJob *BackfillJob) error
GetCtx() *backfillCtx
String() string
}
Expand All @@ -191,7 +187,6 @@ type backfillResult struct {
}

type reorgBackfillTask struct {
bfJob *BackfillJob
physicalTable table.PhysicalTable

// TODO: Remove the following fields after remove the function of run.
Expand All @@ -205,11 +200,7 @@ type reorgBackfillTask struct {
}

func (r *reorgBackfillTask) getJobID() int64 {
jobID := r.jobID
if r.bfJob != nil {
jobID = r.bfJob.JobID
}
return jobID
return r.jobID
}

func (r *reorgBackfillTask) excludedEndKey() kv.Key {
Expand Down Expand Up @@ -258,21 +249,6 @@ func newBackfillWorker(ctx context.Context, bf backfiller) *backfillWorker {
}
}

func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey kv.Key) error {
leaseTime, err := GetOracleTime(w.GetCtx().store)
if err != nil {
return err
}
bfJob.Meta.CurrKey = nextKey
bfJob.InstanceID = execID
bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease)
return w.backfiller.UpdateTask(bfJob)
}

func (w *backfillWorker) finishJob(bfJob *BackfillJob) error {
return w.backfiller.FinishTask(bfJob)
}

func (w *backfillWorker) String() string {
return fmt.Sprintf("backfill-worker %d, tp %s", w.GetCtx().id, w.backfiller.String())
}
Expand Down Expand Up @@ -302,24 +278,18 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
addedCount: 0,
nextKey: handleRange.startKey,
}
batchStartTime := time.Now()
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
jobID := task.getJobID()
rc := d.getReorgCtx(jobID)

isDistReorg := task.bfJob != nil
if isDistReorg {
w.initPartitionIndexInfo(task)
jobID = genBackfillJobReorgCtxID(jobID)
}
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillData we will never cancel the job.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, isDistReorg)
err := d.isReorgRunnable(jobID, false)
if err != nil {
result.err = err
return result
Expand Down Expand Up @@ -358,20 +328,6 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
if taskCtx.done {
break
}

if isDistReorg {
// TODO: Adjust the updating lease frequency by batch processing time carefully.
if time.Since(batchStartTime) < updateInstanceLease {
continue
}
batchStartTime = time.Now()
if err := w.updateLease(w.GetCtx().uuid, task.bfJob, result.nextKey); err != nil {
logutil.BgLogger().Info("[ddl] backfill worker handle task, update lease failed", zap.Stringer("worker", w),
zap.Stringer("task", task), zap.String("backfill job", task.bfJob.AbbrStr()), zap.Error(err))
result.err = err
return result
}
}
}
logutil.BgLogger().Info("[ddl] backfill worker finish task",
zap.Stringer("worker", w), zap.Stringer("task", task),
Expand All @@ -385,61 +341,6 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
if w, ok := w.backfiller.(*addIndexTxnWorker); ok {
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
}
}

func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResult) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w), zap.String("task", task.String()))
defer util.Recover(metrics.LabelDDL, "backfillWorker.runTask", func() {
result = &backfillResult{taskID: task.id, err: dbterror.ErrReorgPanic}
}, false)
defer w.GetCtx().setDDLLabelForTopSQL(task.jobID, task.sqlQuery)

failpoint.Inject("mockBackfillRunErr", func() {
if w.GetCtx().id == 0 {
result := &backfillResult{taskID: task.id, addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")}
failpoint.Return(result)
}
})
failpoint.Inject("mockHighLoadForAddIndex", func() {
sqlPrefixes := []string{"alter"}
topsql.MockHighCPULoad(task.sqlQuery, sqlPrefixes, 5)
})
failpoint.Inject("mockBackfillSlow", func() {
time.Sleep(100 * time.Millisecond)
})

// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
task.bfJob.Meta.RowCount = int64(result.addedCount)
if result.err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker runTask failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err))
if dbterror.ErrDDLJobNotFound.Equal(result.err) {
result.err = nil
return result
}
task.bfJob.State = model.JobStateCancelled
task.bfJob.Meta.Error = toTError(result.err)
if err := w.finishJob(task.bfJob); err != nil {
logutil.BgLogger().Info("[ddl] backfill worker runTask, finishJob failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(err))
result.err = err
}
} else {
task.bfJob.State = model.JobStateDone
result.err = w.finishJob(task.bfJob)
}
return result
}

func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
logutil.BgLogger().Info("[ddl] backfill worker start", zap.Stringer("worker", w))
var curTaskID int
Expand Down
12 changes: 0 additions & 12 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,18 +1214,6 @@ func (*updateColumnWorker) String() string {
return typeUpdateColumnWorker.String()
}

func (*updateColumnWorker) GetTasks() ([]*BackfillJob, error) {
panic("[ddl] update column worker GetTask function doesn't implement")
}

func (*updateColumnWorker) UpdateTask(*BackfillJob) error {
panic("[ddl] update column worker UpdateTask function doesn't implement")
}

func (*updateColumnWorker) FinishTask(*BackfillJob) error {
panic("[ddl] update column worker FinishTask function doesn't implement")
}

func (w *updateColumnWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}
Expand Down
1 change: 0 additions & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ const (
)

const defaultBatchSize = 1024
const defaultReorgBatchSize = 256

const dbTestLease = 600 * time.Millisecond

Expand Down
49 changes: 2 additions & 47 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
rmutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -63,7 +62,6 @@ import (
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/gpool/spmc"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/syncutil"
Expand All @@ -87,9 +85,8 @@ const (

batchAddingJobs = 10

reorgWorkerCnt = 10
generalWorkerCnt = 1
backfillWorkerCnt = 32
reorgWorkerCnt = 10
generalWorkerCnt = 1

// checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list.
checkFlagIndexInJobArgs = 1
Expand Down Expand Up @@ -280,8 +277,6 @@ type ddl struct {
// used in the concurrency ddl.
reorgWorkerPool *workerPool
generalDDLWorkerPool *workerPool
backfillCtxPool *backfillCtxPool
backfillWorkerPool *spmc.Pool[*reorgBackfillTask, *backfillResult, int, *backfillWorker, *backfillWorkerContext]
// get notification if any DDL coming.
ddlJobCh chan struct{}
}
Expand Down Expand Up @@ -512,13 +507,6 @@ type reorgContexts struct {
reorgCtxMap map[int64]*reorgCtx
}

func getReorgCtx(reorgCtxs *reorgContexts, jobID int64) *reorgCtx {
reorgCtxs.RLock()
defer reorgCtxs.RUnlock()
return reorgCtxs.reorgCtxMap[jobID]
}

// TODO: Using getReorgCtx instead of dc.getReorgCtx.
func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
Expand All @@ -544,10 +532,6 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return rc
}

func genBackfillJobReorgCtxID(jobID int64) int64 {
return -jobID
}

func (dc *ddlCtx) removeReorgCtx(jobID int64) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
Expand Down Expand Up @@ -753,24 +737,6 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
d.wg.Run(d.startDispatchLoop)
}

func (d *ddl) prepareBackfillWorkers() error {
workerFactory := func() (pools.Resource, error) {
bk := newBackfillWorker(context.Background(), nil)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_backfill_worker", metrics.CreateDDL)).Inc()
return bk, nil
}
d.backfillCtxPool = newBackfillContextPool(pools.NewResourcePool(workerFactory, backfillWorkerCnt, backfillWorkerCnt, 0))
var err error
d.backfillWorkerPool, err = spmc.NewSPMCPool[*reorgBackfillTask, *backfillResult, int, *backfillWorker,
*backfillWorkerContext]("backfill", int32(backfillWorkerCnt), rmutil.DDL)
if err != nil {
return err
}
d.backfillJobCh = make(chan struct{}, 1)
d.wg.Run(d.startDispatchBackfillJobsLoop)
return nil
}

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))
Expand Down Expand Up @@ -800,11 +766,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err := d.EnableDDL(); err != nil {
return err
}

// TODO: Currently, it is only processed during initialization and is expected to be added to EnableDDL later.
if err := d.prepareBackfillWorkers(); err != nil {
return err
}
}

variable.RegisterStatistics(d)
Expand Down Expand Up @@ -877,12 +838,6 @@ func (d *ddl) close() {
if d.generalDDLWorkerPool != nil {
d.generalDDLWorkerPool.close()
}
if d.backfillCtxPool != nil {
d.backfillCtxPool.close()
}
if d.backfillWorkerPool != nil {
d.backfillWorkerPool.ReleaseAndWait()
}

// d.delRangeMgr using sessions from d.sessPool.
// Put it before d.sessPool.close to reduce the time spent by d.sessPool.close.
Expand Down
23 changes: 0 additions & 23 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,34 +79,11 @@ func (d *ddl) RemoveReorgCtx(id int64) {
// JobNeedGCForTest is only used for test.
var JobNeedGCForTest = jobNeedGC

// GetJobWithoutPartition is only used for test.
const GetJobWithoutPartition = getJobWithoutPartition

// BackfillJobPrefixKeyString is only used for test.
func BackfillJobPrefixKeyString(ddlJobID int64, eleKey kv.Key, eleID int64) string {
return backfillJobPrefixKeyString(ddlJobID, eleKey, eleID)
}

// GetDDLCtx returns ddlCtx for test.
func GetDDLCtx(d DDL) *ddlCtx {
return d.(*ddl).ddlCtx
}

// GetMaxRowID is used for test.
func GetMaxRowID(store kv.Storage, priority int, t table.Table, startHandle, endHandle kv.Key) (kv.Key, error) {
return getRangeEndKey(NewJobContext(), store, priority, t.RecordPrefix(), startHandle, endHandle)
}

func testNewDDLAndStart(ctx context.Context, options ...Option) (*ddl, error) {
// init infoCache and a stub infoSchema
ic := infoschema.NewCache(2)
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0)
options = append(options, WithInfoCache(ic))
d := newDDL(ctx, options...)
err := d.Start(nil)
return d, err
}

func createMockStore(t *testing.T) kv.Storage {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ var (
// PollTiFlashBackoffMinTick is the min tick before we try to update TiFlash replica availability for one table.
PollTiFlashBackoffMinTick TiFlashTick = 1
// PollTiFlashBackoffCapacity is the cache size of backoff struct.
PollTiFlashBackoffCapacity int = 1000
PollTiFlashBackoffCapacity = 1000
// PollTiFlashBackoffRate is growth rate of exponential backoff threshold.
PollTiFlashBackoffRate TiFlashTick = 1.5
// RefreshProgressMaxTableCount is the max count of table to refresh progress after available each poll.
Expand Down
Loading

0 comments on commit 5075e5f

Please sign in to comment.