Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: move dispatch code to one file #52396

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
"dispatch.go",
"dist_owner.go",
"foreign_key.go",
"generated_column.go",
Expand Down
52 changes: 0 additions & 52 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -264,20 +263,6 @@ type DDL interface {
DoDDLJob(ctx sessionctx.Context, job *model.Job) error
}

type limitJobTask struct {
job *model.Job
// when we combine multiple jobs into one task,
// append the errChs to this slice.
errChs []chan error
cacheErr error
}

func (t *limitJobTask) NotifyError(err error) {
for _, errCh := range t.errChs {
errCh <- err
}
}

// ddl is used to handle the statements that define the structure or schema of the database.
type ddl struct {
m sync.RWMutex
Expand Down Expand Up @@ -383,11 +368,6 @@ type ddlCtx struct {

// reorgCtx is used for reorganization.
reorgCtx reorgContexts
// backfillCtx is used for backfill workers.
backfillCtx struct {
syncutil.RWMutex
jobCtxMap map[int64]*JobContext
}

jobCtx struct {
sync.RWMutex
Expand Down Expand Up @@ -668,10 +648,6 @@ func asyncNotifyEvent(d *ddlCtx, e *statsutil.DDLEvent) {

// NewDDL creates a new DDL.
func NewDDL(ctx context.Context, options ...Option) DDL {
return newDDL(ctx, options...)
}

func newDDL(ctx context.Context, options ...Option) *ddl {
opt := &Options{
Hook: &BaseCallback{},
}
Expand Down Expand Up @@ -1126,14 +1102,6 @@ func setDDLJobMode(job *model.Job) {
job.LocalMode = false
}

func (d *ddl) deliverJobTask(task *limitJobTask) {
if task.job.LocalMode {
d.limitJobChV2 <- task
} else {
d.limitJobCh <- task
}
}

func (*ddl) shouldCheckHistoryJob(job *model.Job) bool {
// for local mode job, we add the history job directly now, so no need to check it.
return !job.LocalMode
Expand Down Expand Up @@ -1649,26 +1617,6 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) {
return info, nil
}

func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
var generalJob, reorgJob *model.Job
jobs, err := getJobsBySQL(sess, JobTable, "not reorg order by job_id limit 1")
if err != nil {
return nil, nil, errors.Trace(err)
}

if len(jobs) != 0 {
generalJob = jobs[0]
}
jobs, err = getJobsBySQL(sess, JobTable, "reorg order by job_id limit 1")
if err != nil {
return nil, nil, errors.Trace(err)
}
if len(jobs) != 0 {
reorgJob = jobs[0]
}
return generalJob, reorgJob, nil
}

// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(_ *sess.Session, job *model.Job,
byWho model.AdminCommandOperator) (err error) {
Expand Down
Loading