Skip to content

Commit

Permalink
ddl: Increase ddl unit test speed (#41034)
Browse files Browse the repository at this point in the history
close #41216
  • Loading branch information
blacktear23 authored Mar 7, 2023
1 parent 2da699e commit 29adb0a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ ifeq ("$(pkg)", "")
else
@echo "Running unit test for github.com/pingcap/tidb/$(pkg)"
@export log_level=fatal; export TZ='Asia/Shanghai'; \
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' -cover github.com/pingcap/tidb/$(pkg) || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -tags 'intest' -v -ldflags '$(TEST_LDFLAGS)' -cover github.com/pingcap/tidb/$(pkg) || { $(FAILPOINT_DISABLE); exit 1; }
endif
@$(FAILPOINT_DISABLE)

Expand Down
7 changes: 6 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
pumpcli "github.com/pingcap/tidb/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/resourcegrouptag"
Expand Down Expand Up @@ -821,7 +822,11 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
// which may act like a deadlock.
logutil.Logger(w.logCtx).Info("[ddl] run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
time.Sleep(GetWaitTimeWhenErrorOccurred())

// In test and job is cancelling we can ignore the sleep
if !(intest.InTest && job.IsCancelling()) {
time.Sleep(GetWaitTimeWhenErrorOccurred())
}
}

return schemaVer, nil
Expand Down
17 changes: 13 additions & 4 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,26 @@ import (
"github.com/pingcap/tidb/types"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
addingDDLJobConcurrent = "/tidb/ddl/add_ddl_job_general"
addingBackfillJob = "/tidb/ddl/add_backfill_job"
addingDDLJobConcurrent = "/tidb/ddl/add_ddl_job_general"
addingBackfillJob = "/tidb/ddl/add_backfill_job"
dispatchLoopWaitingDuration = 1 * time.Second
)

func init() {
// In test the wait duration can be reduced to make test case run faster
if intest.InTest {
dispatchLoopWaitingDuration = 50 * time.Millisecond
}
}

func (dc *ddlCtx) insertRunningDDLJobMap(id int64) {
dc.runningJobs.Lock()
defer dc.runningJobs.Unlock()
Expand Down Expand Up @@ -173,15 +182,15 @@ func (d *ddl) startDispatchLoop() {
if d.etcdCli != nil {
notifyDDLJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingDDLJobConcurrent)
}
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(dispatchLoopWaitingDuration)
defer ticker.Stop()
for {
if isChanClosed(d.ctx.Done()) {
return
}
if !d.isOwner() || d.waiting.Load() {
d.once.Store(true)
time.Sleep(time.Second)
time.Sleep(dispatchLoopWaitingDuration)
continue
}
select {
Expand Down
131 changes: 76 additions & 55 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ import (
"google.golang.org/grpc/keepalive"
)

var (
mdlCheckLookDuration = 50 * time.Millisecond
)

func init() {
if intest.InTest {
// In test we can set duration lower to make test faster.
mdlCheckLookDuration = 2 * time.Millisecond
}
}

// NewMockDomain is only used for test
func NewMockDomain() *Domain {
do := &Domain{
Expand Down Expand Up @@ -153,6 +164,7 @@ type Domain struct {
sctxs map[sessionctx.Context]bool
}

mdlCheckCh chan struct{}
stopAutoAnalyze atomicutil.Bool
}

Expand Down Expand Up @@ -712,78 +724,81 @@ func (do *Domain) refreshMDLCheckTableInfo() {
}

func (do *Domain) mdlCheckLoop() {
ticker := time.Tick(time.Millisecond * 50)
ticker := time.Tick(mdlCheckLookDuration)
var saveMaxSchemaVersion int64
jobNeedToSync := false
jobCache := make(map[int64]int64, 1000)

for {
// Wait for channels
select {
case <-do.mdlCheckCh:
case <-ticker:
if !variable.EnableMDL.Load() {
continue
}
case <-do.exit:
return
}

do.mdlCheckTableInfo.mu.Lock()
maxVer := do.mdlCheckTableInfo.newestVer
if maxVer > saveMaxSchemaVersion {
saveMaxSchemaVersion = maxVer
} else if !jobNeedToSync {
// Schema doesn't change, and no job to check in the last run.
do.mdlCheckTableInfo.mu.Unlock()
continue
}
if !variable.EnableMDL.Load() {
continue
}

jobNeedToCheckCnt := len(do.mdlCheckTableInfo.jobsVerMap)
if jobNeedToCheckCnt == 0 {
jobNeedToSync = false
do.mdlCheckTableInfo.mu.Unlock()
continue
}
do.mdlCheckTableInfo.mu.Lock()
maxVer := do.mdlCheckTableInfo.newestVer
if maxVer > saveMaxSchemaVersion {
saveMaxSchemaVersion = maxVer
} else if !jobNeedToSync {
// Schema doesn't change, and no job to check in the last run.
do.mdlCheckTableInfo.mu.Unlock()
continue
}

jobsVerMap := make(map[int64]int64, len(do.mdlCheckTableInfo.jobsVerMap))
jobsIdsMap := make(map[int64]string, len(do.mdlCheckTableInfo.jobsIdsMap))
for k, v := range do.mdlCheckTableInfo.jobsVerMap {
jobsVerMap[k] = v
}
for k, v := range do.mdlCheckTableInfo.jobsIdsMap {
jobsIdsMap[k] = v
}
jobNeedToCheckCnt := len(do.mdlCheckTableInfo.jobsVerMap)
if jobNeedToCheckCnt == 0 {
jobNeedToSync = false
do.mdlCheckTableInfo.mu.Unlock()
continue
}

jobNeedToSync = true
jobsVerMap := make(map[int64]int64, len(do.mdlCheckTableInfo.jobsVerMap))
jobsIdsMap := make(map[int64]string, len(do.mdlCheckTableInfo.jobsIdsMap))
for k, v := range do.mdlCheckTableInfo.jobsVerMap {
jobsVerMap[k] = v
}
for k, v := range do.mdlCheckTableInfo.jobsIdsMap {
jobsIdsMap[k] = v
}
do.mdlCheckTableInfo.mu.Unlock()

sm := do.InfoSyncer().GetSessionManager()
if sm == nil {
logutil.BgLogger().Info("session manager is nil")
} else {
sm.CheckOldRunningTxn(jobsVerMap, jobsIdsMap)
}
jobNeedToSync = true

if len(jobsVerMap) == jobNeedToCheckCnt {
jobNeedToSync = false
}
sm := do.InfoSyncer().GetSessionManager()
if sm == nil {
logutil.BgLogger().Info("session manager is nil")
} else {
sm.CheckOldRunningTxn(jobsVerMap, jobsIdsMap)
}

// Try to gc jobCache.
if len(jobCache) > 1000 {
jobCache = make(map[int64]int64, 1000)
}
if len(jobsVerMap) == jobNeedToCheckCnt {
jobNeedToSync = false
}

for jobID, ver := range jobsVerMap {
if cver, ok := jobCache[jobID]; ok && cver >= ver {
// Already update, skip it.
continue
}
logutil.BgLogger().Info("mdl gets lock, update to owner", zap.Int64("jobID", jobID), zap.Int64("version", ver))
err := do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), jobID, ver)
if err != nil {
logutil.BgLogger().Warn("update self version failed", zap.Error(err))
} else {
jobCache[jobID] = ver
}
// Try to gc jobCache.
if len(jobCache) > 1000 {
jobCache = make(map[int64]int64, 1000)
}

for jobID, ver := range jobsVerMap {
if cver, ok := jobCache[jobID]; ok && cver >= ver {
// Already update, skip it.
continue
}
logutil.BgLogger().Info("mdl gets lock, update to owner", zap.Int64("jobID", jobID), zap.Int64("version", ver))
err := do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), jobID, ver)
if err != nil {
logutil.BgLogger().Warn("update self version failed", zap.Error(err))
} else {
jobCache[jobID] = ver
}
case <-do.exit:
return
}
}
}
Expand Down Expand Up @@ -844,6 +859,10 @@ func (do *Domain) loadSchemaInLoop(ctx context.Context, lease time.Duration) {
return
}
do.refreshMDLCheckTableInfo()
select {
case do.mdlCheckCh <- struct{}{}:
default:
}
}
}

Expand Down Expand Up @@ -941,6 +960,7 @@ func (do *Domain) Close() {
do.onClose()
}
gctuner.WaitMemoryLimitTunerExitInTest()
close(do.mdlCheckCh)
logutil.BgLogger().Info("domain closed", zap.Duration("take time", time.Since(startTime)))
}

Expand All @@ -964,6 +984,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
jobsVerMap: make(map[int64]int64),
jobsIdsMap: make(map[int64]string),
},
mdlCheckCh: make(chan struct{}),
}
do.stopAutoAnalyze.Store(false)
do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitCheck)
Expand Down
2 changes: 1 addition & 1 deletion tools/check/ut.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ func buildTestBinaryMulti(pkgs []string) error {
}

var cmd *exec.Cmd
cmd = exec.Command("go", "test", "--exec", xprogPath, "-vet", "off", "-count", "0")
cmd = exec.Command("go", "test", "--tags=intest", "--exec", xprogPath, "-vet", "off", "-count", "0")
if coverprofile != "" {
cmd.Args = append(cmd.Args, "-cover")
}
Expand Down

0 comments on commit 29adb0a

Please sign in to comment.