Skip to content

Commit

Permalink
ddl: fix data race for ddl seq (pingcap#32542)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored Feb 23, 2022
1 parent 816a083 commit 129dadf
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
11 changes: 8 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ type ddlCtx struct {
hook Callback
interceptor Interceptor
}

ddlSeqNumMu struct {
sync.Mutex
seqNum uint64
}
}

func (dc *ddlCtx) isOwner() bool {
Expand Down Expand Up @@ -379,8 +384,8 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.workers = make(map[workerType]*worker, 2)
d.sessPool = newSessionPool(ctxPool)
d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
d.workers[generalWorker] = newWorker(d.ctx, generalWorker, d.sessPool, d.delRangeMgr)
d.workers[addIdxWorker] = newWorker(d.ctx, addIdxWorker, d.sessPool, d.delRangeMgr)
d.workers[generalWorker] = newWorker(d.ctx, generalWorker, d.sessPool, d.delRangeMgr, d.ddlCtx)
d.workers[addIdxWorker] = newWorker(d.ctx, addIdxWorker, d.sessPool, d.delRangeMgr, d.ddlCtx)
for _, worker := range d.workers {
worker.wg.Add(1)
w := worker
Expand All @@ -395,7 +400,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {

err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
globalSeqNum, err = t.GetHistoryDDLCount()
d.ddlSeqNumMu.seqNum, err = t.GetHistoryDDLCount()
return err
})
if err != nil {
Expand Down
21 changes: 10 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ var (
ddlWorkerID = int32(0)
// WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors.
WaitTimeWhenErrorOccurred = int64(1 * time.Second)

ddlSeqNumMu sync.Mutex
globalSeqNum uint64
)

// GetWaitTimeWhenErrorOccurred return waiting interval when processing DDL jobs encounter errors.
Expand Down Expand Up @@ -97,6 +94,7 @@ type worker struct {
logCtx context.Context
lockSeqNum bool

*ddlCtx
ddlJobCache
}

Expand All @@ -109,7 +107,7 @@ type ddlJobCache struct {
cacheDigest *parser.Digest
}

func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager) *worker {
func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx) *worker {
worker := &worker{
id: atomic.AddInt32(&ddlWorkerID, 1),
tp: tp,
Expand All @@ -121,6 +119,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan
cacheNormalizedSQL: "",
cacheDigest: nil,
},
ddlCtx: dCtx,
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
delRangeManager: delRangeMgr,
Expand Down Expand Up @@ -450,10 +449,10 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
ddlSeqNumMu.Lock()
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
w.lockSeqNum = true
globalSeqNum++
job.SeqNum = globalSeqNum
job.SeqNum = w.ddlSeqNumMu.seqNum
}

func finishRecoverTable(w *worker, job *model.Job) error {
Expand Down Expand Up @@ -605,10 +604,10 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {

if err != nil {
if w.lockSeqNum {
// txn commit failed, we should reset globalSeqNum
globalSeqNum--
// txn commit failed, we should reset seqNum.
w.ddlSeqNumMu.seqNum--
w.lockSeqNum = false
ddlSeqNumMu.Unlock()
w.ddlSeqNumMu.Unlock()
}
return errors.Trace(err)
} else if job == nil {
Expand All @@ -617,7 +616,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}
if w.lockSeqNum {
w.lockSeqNum = false
ddlSeqNumMu.Unlock()
d.ddlSeqNumMu.Unlock()
}
w.waitDependencyJobFinished(job, &waitDependencyJobCnt)

Expand Down

0 comments on commit 129dadf

Please sign in to comment.