Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: let background process unify wg.Run #40640

Merged
merged 15 commits into from
Jan 18, 2023
107 changes: 40 additions & 67 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ func (do *Domain) topNSlowQueryLoop() {
ticker := time.NewTicker(time.Minute * 10)
defer func() {
ticker.Stop()
do.wg.Done()
logutil.BgLogger().Info("topNSlowQueryLoop exited.")
}()
for {
Expand Down Expand Up @@ -583,7 +582,6 @@ func (do *Domain) topNSlowQueryLoop() {

func (do *Domain) infoSyncerKeeper() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("infoSyncerKeeper exited.")
util.Recover(metrics.LabelDomain, "infoSyncerKeeper", nil, false)
}()
Expand All @@ -608,7 +606,6 @@ func (do *Domain) infoSyncerKeeper() {

func (do *Domain) globalConfigSyncerKeeper() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("globalConfigSyncerKeeper exited.")
util.Recover(metrics.LabelDomain, "globalConfigSyncerKeeper", nil, false)
}()
Expand All @@ -631,7 +628,6 @@ func (do *Domain) topologySyncerKeeper() {
ticker := time.NewTicker(infosync.TopologyTimeToRefresh)
defer func() {
ticker.Stop()
do.wg.Done()
logutil.BgLogger().Info("topologySyncerKeeper exited.")
}()

Expand Down Expand Up @@ -766,7 +762,6 @@ func (do *Domain) loadSchemaInLoop(ctx context.Context, lease time.Duration) {
ticker := time.NewTicker(lease / 2)
defer func() {
ticker.Stop()
do.wg.Done()
logutil.BgLogger().Info("loadSchemaInLoop exited.")
}()
syncer := do.ddl.SchemaSyncer()
Expand Down Expand Up @@ -1062,22 +1057,22 @@ func (do *Domain) Init(
// Only when the store is local that the lease value is 0.
// If the store is local, it doesn't need loadSchemaInLoop.
if ddlLease > 0 {
do.wg.Add(1)
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
do.wg.Run(func() {
do.loadSchemaInLoop(ctx, ddlLease)
}, "loadSchemaInLoop")
}
do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop")
do.wg.Add(3)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
go do.globalConfigSyncerKeeper()
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
if !skipRegisterToDashboard {
do.wg.Add(1)
go do.topologySyncerKeeper()
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
if pdClient != nil {
do.wg.Add(1)
go do.closestReplicaReadCheckLoop(ctx, pdClient)
do.wg.Run(func() {
do.closestReplicaReadCheckLoop(ctx, pdClient)
}, "closestReplicaReadCheckLoop")
}
err = do.initLogBackup(ctx, pdClient)
if err != nil {
Expand Down Expand Up @@ -1125,7 +1120,6 @@ func (do *Domain) closestReplicaReadCheckLoop(ctx context.Context, pdClient pd.C
ticker := time.NewTicker(time.Minute)
defer func() {
ticker.Stop()
do.wg.Done()
logutil.BgLogger().Info("closestReplicaReadCheckLoop exited.")
}()
for {
Expand Down Expand Up @@ -1347,10 +1341,8 @@ func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
duration = 10 * time.Minute
}

do.wg.Add(1)
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("loadPrivilegeInLoop exited.")
util.Recover(metrics.LabelDomain, "loadPrivilegeInLoop", nil, false)
}()
Expand Down Expand Up @@ -1380,7 +1372,7 @@ func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
logutil.BgLogger().Error("load privilege failed", zap.Error(err))
}
}
}()
}, "loadPrivilegeInLoop")
return nil
}

Expand All @@ -1397,10 +1389,9 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey)
}
do.wg.Add(1)
go func() {

do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("LoadSysVarCacheLoop exited.")
util.Recover(metrics.LabelDomain, "LoadSysVarCacheLoop", nil, false)
}()
Expand Down Expand Up @@ -1443,7 +1434,7 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
logutil.BgLogger().Error("LoadSysVarCacheLoop failed", zap.Error(err))
}
}
}()
}, "LoadSysVarCacheLoop")
return nil
}

Expand All @@ -1456,11 +1447,9 @@ func (do *Domain) WatchTiFlashComputeNodeChange() error {
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), tiflashComputeNodeKey)
}
do.wg.Add(1)
duration := 10 * time.Second
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("WatchTiFlashComputeNodeChange exit")
util.Recover(metrics.LabelDomain, "WatchTiFlashComputeNodeChange", nil, false)
}()
Expand Down Expand Up @@ -1501,7 +1490,7 @@ func (do *Domain) WatchTiFlashComputeNodeChange() error {
return
}
}
}()
}, "WatchTiFlashComputeNodeChange")
return nil
}

Expand Down Expand Up @@ -1536,10 +1525,8 @@ func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve
}

func (do *Domain) globalBindHandleWorkerLoop(owner owner.Manager) {
do.wg.Add(1)
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("globalBindHandleWorkerLoop exited.")
util.Recover(metrics.LabelDomain, "globalBindHandleWorkerLoop", nil, false)
}()
Expand Down Expand Up @@ -1576,14 +1563,12 @@ func (do *Domain) globalBindHandleWorkerLoop(owner owner.Manager) {
}
}
}
}()
}, "globalBindHandleWorkerLoop")
}

func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context, owner owner.Manager) {
do.wg.Add(1)
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("handleEvolvePlanTasksLoop exited.")
util.Recover(metrics.LabelDomain, "handleEvolvePlanTasksLoop", nil, false)
}()
Expand All @@ -1601,7 +1586,7 @@ func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context, owner owner.
}
}
}
}()
}, "handleEvolvePlanTasksLoop")
}

// TelemetryReportLoop create a goroutine that reports usage data in a loop, it should be called only once
Expand All @@ -1613,10 +1598,8 @@ func (do *Domain) TelemetryReportLoop(ctx sessionctx.Context) {
logutil.BgLogger().Warn("Initial telemetry run failed", zap.Error(err))
}

do.wg.Add(1)
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("TelemetryReportLoop exited.")
util.Recover(metrics.LabelDomain, "TelemetryReportLoop", nil, false)
}()
Expand All @@ -1637,16 +1620,14 @@ func (do *Domain) TelemetryReportLoop(ctx sessionctx.Context) {
}
}
}
}()
}, "TelemetryReportLoop")
}

// TelemetryRotateSubWindowLoop create a goroutine that rotates the telemetry window regularly.
func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
ctx.GetSessionVars().InRestrictedSQL = true
do.wg.Add(1)
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("TelemetryRotateSubWindowLoop exited.")
util.Recover(metrics.LabelDomain, "TelemetryRotateSubWindowLoop", nil, false)
}()
Expand All @@ -1658,7 +1639,7 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
telemetry.RotateSubWindow()
}
}
}()
}, "TelemetryRotateSubWindowLoop")
}

// SetupPlanReplayerHandle setup plan replayer handle
Expand Down Expand Up @@ -1727,13 +1708,11 @@ func (do *Domain) StartPlanReplayerHandle() {
if lease < 1 {
return
}
do.wg.Add(2)
go func() {
do.wg.Run(func() {
logutil.BgLogger().Info("PlanReplayerTaskCollectHandle started")
tikcer := time.NewTicker(time.Duration(lease))
defer func() {
tikcer.Stop()
do.wg.Done()
util.Recover(metrics.LabelDomain, "PlanReplayerTaskCollectHandle", nil, false)
logutil.BgLogger().Info("PlanReplayerTaskCollectHandle exited.")
}()
Expand All @@ -1748,11 +1727,11 @@ func (do *Domain) StartPlanReplayerHandle() {
}
}
}
}()
go func() {
logutil.BgLogger().Info("PlanReplayerTaskCollectHandle started")
}, "PlanReplayerTaskCollectHandle")

do.wg.Run(func() {
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle started")
defer func() {
do.wg.Done()
util.Recover(metrics.LabelDomain, "PlanReplayerTaskDumpHandle", nil, false)
logutil.BgLogger().Info("PlanReplayerTaskDumpHandle exited.")
}()
Expand All @@ -1761,7 +1740,7 @@ func (do *Domain) StartPlanReplayerHandle() {
}
<-do.exit
do.planReplayerHandle.planReplayerTaskDumpHandle.Close()
}()
}, "PlanReplayerTaskDumpHandle")
}

// GetPlanReplayerHandle returns plan replayer handle
Expand All @@ -1771,13 +1750,11 @@ func (do *Domain) GetPlanReplayerHandle() *planReplayerHandle {

// DumpFileGcCheckerLoop creates a goroutine that handles `exit` and `gc`.
func (do *Domain) DumpFileGcCheckerLoop() {
do.wg.Add(1)
go func() {
do.wg.Run(func() {
logutil.BgLogger().Info("dumpFileGcChecker started")
gcTicker := time.NewTicker(do.dumpFileGcChecker.gcLease)
defer func() {
gcTicker.Stop()
do.wg.Done()
util.Recover(metrics.LabelDomain, "dumpFileGcCheckerLoop", nil, false)
logutil.BgLogger().Info("dumpFileGcChecker exited.")
}()
Expand All @@ -1789,7 +1766,7 @@ func (do *Domain) DumpFileGcCheckerLoop() {
do.dumpFileGcChecker.gcDumpFiles(time.Hour)
}
}
}()
}, "dumpFileGcChecker")
}

// GetHistoricalStatsWorker gets historical workers
Expand All @@ -1805,11 +1782,9 @@ func (do *Domain) StartHistoricalStatsWorker() {
if !enableDumpHistoricalStats.Load() {
return
}
do.wg.Add(1)
go func() {
do.wg.Run(func() {
logutil.BgLogger().Info("HistoricalStatsWorker started")
defer func() {
do.wg.Done()
util.Recover(metrics.LabelDomain, "HistoricalStatsWorkerLoop", nil, false)
logutil.BgLogger().Info("HistoricalStatsWorker exited.")
}()
Expand All @@ -1828,7 +1803,7 @@ func (do *Domain) StartHistoricalStatsWorker() {
}
}
}
}()
}, "HistoricalStatsWorker")
}

// StatsHandle returns the statistic handle.
Expand Down Expand Up @@ -1926,8 +1901,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
}
owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
if do.indexUsageSyncLease > 0 {
do.wg.Add(1)
go do.syncIndexUsageWorker(owner)
do.wg.Run(func() {
do.syncIndexUsageWorker(owner)
}, "syncIndexUsageWorker")
}
if do.statsLease <= 0 {
return nil
Expand Down Expand Up @@ -2016,7 +1992,6 @@ func (do *Domain) syncIndexUsageWorker(owner owner.Manager) {
handle := do.StatsHandle()
defer func() {
idxUsageSyncTicker.Stop()
do.wg.Done()
logutil.BgLogger().Info("syncIndexUsageWorker exited.")
}()
for {
Expand Down Expand Up @@ -2261,10 +2236,8 @@ func (do *Domain) LoadSigningCertLoop(signingCert, signingKey string) {
sessionstates.SetCertPath(signingCert)
sessionstates.SetKeyPath(signingKey)

do.wg.Add(1)
go func() {
do.wg.Run(func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Debug("loadSigningCertLoop exited.")
util.Recover(metrics.LabelDomain, "LoadSigningCertLoop", nil, false)
}()
Expand All @@ -2276,7 +2249,7 @@ func (do *Domain) LoadSigningCertLoop(signingCert, signingKey string) {
return
}
}
}()
}, "loadSigningCertLoop")
}

// ServerID gets serverID.
Expand Down