Skip to content

Commit

Permalink
ddl: clean up reorgCtx (#42845)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored Apr 7, 2023
1 parent 566c3f6 commit 7c2c992
Show file tree
Hide file tree
Showing 7 changed files with 8 additions and 27 deletions.
3 changes: 0 additions & 3 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,9 +1150,6 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle
}

// Update the element in the reorgCtx to keep the atomic access for daemon-worker.
w.getReorgCtx(reorgInfo.Job.ID).setCurrentElement(reorgInfo.elements[i+1])

// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
// Write the reorg info to store so the whole reorganize process can recover from panic.
Expand Down
3 changes: 1 addition & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
return dc.reorgCtx.reorgCtxMap[jobID]
}

func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx {
func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
existedRC, ok := dc.reorgCtx.reorgCtxMap[jobID]
Expand All @@ -534,7 +534,6 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.El
rc.doneCh = make(chan error, 1)
// initial reorgCtx
rc.setRowCount(rowCount)
rc.setCurrentElement(currElement)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
rc.mu.warningsCount = make(map[errors.ErrorID]int64)
rc.references.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const testLease = 5 * time.Millisecond
type DDLForTest interface {
// SetInterceptor sets the interceptor.
SetInterceptor(h Interceptor)
NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx
NewReorgCtx(jobID int64, rowCount int64) *reorgCtx
GetReorgCtx(jobID int64) *reorgCtx
RemoveReorgCtx(id int64)
}
Expand All @@ -62,8 +62,8 @@ func (rc *reorgCtx) IsReorgCanceled() bool {
}

// NewReorgCtx exports for testing.
func (d *ddl) NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx {
return d.newReorgCtx(jobID, startKey, currElement, rowCount)
func (d *ddl) NewReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return d.newReorgCtx(jobID, rowCount)
}

// GetReorgCtx exports for testing.
Expand Down
8 changes: 2 additions & 6 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,16 @@ func TestUsingReorgCtx(t *testing.T) {
wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Run(func() {
jobID := int64(1)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (d *ddl) loadBackfillJobAndRun() {
return
}
// TODO: Adjust how the non-owner uses ReorgCtx.
d.newReorgCtx(genBackfillJobReorgCtxID(bfJob.JobID), bfJob.Meta.StartKey, &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}, bfJob.Meta.RowCount)
d.newReorgCtx(genBackfillJobReorgCtxID(bfJob.JobID), bfJob.Meta.RowCount)
d.wg.Run(func() {
defer func() {
tidbutil.Recover(metrics.LabelDistReorg, "runBackfillJobs", nil, false)
Expand Down
3 changes: 0 additions & 3 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2847,9 +2847,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
// Always (re)start with the full PhysicalTable range
reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle

// Update the element in the reorgCtx to keep the atomic access for daemon-worker.
w.getReorgCtx(reorgInfo.Job.ID).setCurrentElement(reorgInfo.elements[i+1])

// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
// Write the reorg info to store so the whole reorganize process can recover from panic.
Expand Down
10 changes: 1 addition & 9 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ type reorgCtx struct {
// 1: job is canceled.
notifyCancelReorgJob int32

// element is used to record the current element in the reorg process, it can be
// accessed by reorg-worker and daemon-worker concurrently.
element atomic.Value

mu struct {
sync.Mutex
// warnings are used to store the warnings when doing the reorg job under certain SQL modes.
Expand Down Expand Up @@ -110,10 +106,6 @@ func (rc *reorgCtx) setRowCount(count int64) {
atomic.StoreInt64(&rc.rowCount, count)
}

func (rc *reorgCtx) setCurrentElement(element *meta.Element) {
rc.element.Store(element)
}

func (rc *reorgCtx) mergeWarnings(warnings map[errors.ErrorID]*terror.Error, warningsCount map[errors.ErrorID]int64) {
if len(warnings) == 0 || len(warningsCount) == 0 {
return
Expand Down Expand Up @@ -198,7 +190,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
if job.IsCancelling() {
return dbterror.ErrCancelledDDLJob
}
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.StartKey, reorgInfo.currElement, reorgInfo.Job.GetRowCount())
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
w.wg.Add(1)
go func() {
defer w.wg.Done()
Expand Down

0 comments on commit 7c2c992

Please sign in to comment.