Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: cherrypick some PRs to fix MDL bug #47079

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ const (
OnExistReplace

jobRecordCapacity = 16
jobOnceCapacity = 1000
)

var (
Expand Down Expand Up @@ -286,14 +287,14 @@ type waitSchemaSyncedController struct {
mu sync.RWMutex
job map[int64]struct{}

// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
once *atomicutil.Bool
// Use to check if the DDL job is the first run on this owner.
onceMap map[int64]struct{}
}

func newWaitSchemaSyncedController() *waitSchemaSyncedController {
return &waitSchemaSyncedController{
job: make(map[int64]struct{}, jobRecordCapacity),
once: atomicutil.NewBool(true),
job: make(map[int64]struct{}, jobRecordCapacity),
onceMap: make(map[int64]struct{}, jobOnceCapacity),
}
}

Expand All @@ -316,6 +317,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) {
delete(w.job, job.ID)
}

// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner.
// Returns false means that the job must not be the first run on this owner.
func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool {
w.mu.Lock()
defer w.mu.Unlock()
_, ok := w.onceMap[id]
return ok
}

func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.onceMap) > jobOnceCapacity {
// If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok.
w.onceMap = make(map[int64]struct{}, jobRecordCapacity)
}
w.onceMap[id] = struct{}{}
}

// ddlCtx is the context when we use worker to handle DDL jobs.
type ddlCtx struct {
ctx context.Context
Expand Down
30 changes: 16 additions & 14 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
waitSchemaChanged(context.Background(), d, waitTime, schemaVer, job)

_ = waitSchemaChanged(d, waitTime, schemaVer, job)
if RunInGoTest {
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
d.mu.RLock()
Expand Down Expand Up @@ -1352,14 +1351,14 @@ func toTError(err error) *terror.Error {
return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error())
}

// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens,
// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) {
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return
return nil
}
if waitTime == 0 {
return
return nil
}

timeStart := time.Now()
Expand All @@ -1370,29 +1369,33 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l

if latestSchemaVersion == 0 {
logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change")
return
return nil
}

err = d.schemaSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion)
err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
if variable.EnableMDL.Load() {
return err
}
if terror.ErrorEqual(err, context.DeadlineExceeded) {
// If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker.
// There is no need to use etcd to sync. The function returns directly.
return
return nil
}
}

// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err = d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return
return err
}
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
zap.Int64("ver", latestSchemaVersion),
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()))
return nil
}

// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
Expand All @@ -1409,7 +1412,7 @@ func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64

timeStart := time.Now()
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
err := d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
if err != nil {
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
return err
Expand Down Expand Up @@ -1451,8 +1454,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
}
})

waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job)
return nil
return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
Expand Down
17 changes: 11 additions & 6 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (d *ddl) startDispatchLoop() {
return
}
if !variable.EnableConcurrentDDL.Load() || !d.isOwner() || d.waiting.Load() {
d.once.Store(true)
d.onceMap = make(map[int64]struct{}, jobOnceCapacity)
time.Sleep(time.Second)
continue
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
}()
// check if this ddl job is synced to all servers.
if !d.isSynced(job) || d.once.Load() {
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
if variable.EnableMDL.Load() {
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
if err != nil {
Expand All @@ -249,7 +249,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
// Don't have a worker now.
return
Expand All @@ -263,7 +263,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
pool.put(wk)
return
}
d.once.Store(false)
d.setAlreadyRunOnce(job.ID)
}
}

Expand All @@ -282,9 +282,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
})

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
// the newest schema.
waitSchemaChanged(context.Background(), d.ddlCtx, d.lease*2, schemaVer, job)
err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
if err != nil {
// May be caused by server closing, shouldn't clean the MDL info.
logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err))
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
d.synced(job)

Expand Down
6 changes: 4 additions & 2 deletions ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,15 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
if variable.EnableMDL.Load() {
for _, kv := range resp.Kvs {
key := string(kv.Key)
tidbIDInResp := key[strings.LastIndex(key, "/")+1:]
ver, err := strconv.Atoi(string(kv.Value))
if err != nil {
logutil.BgLogger().Info("[ddl] syncer check all versions, convert value to int failed, continue checking.", zap.String("ddl", string(kv.Key)), zap.String("value", string(kv.Value)), zap.Error(err))
succ = false
break
}
if int64(ver) < latestVer {
// We need to check if the tidb ID is in the updatedMap, in case that deleting etcd is failed, and tidb server is down.
if int64(ver) < latestVer && updatedMap[tidbIDInResp] != "" {
if notMatchVerCnt%intervalCnt == 0 {
logutil.BgLogger().Info("[ddl] syncer check all versions, someone is not synced, continue checking",
zap.String("ddl", string(kv.Key)), zap.Int("currentVer", ver), zap.Int64("latestVer", latestVer))
Expand All @@ -325,7 +327,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
notMatchVerCnt++
break
}
delete(updatedMap, key[strings.LastIndex(key, "/")+1:])
delete(updatedMap, tidbIDInResp)
}
if len(updatedMap) > 0 {
succ = false
Expand Down