Skip to content

Commit

Permalink
executor: reuse goroutine in the analyze (pingcap#47637)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Nov 3, 2023
1 parent 0172ba0 commit 5a22d56
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ go_library(
"@com_github_prometheus_client_golang//api/prometheus/v1:prometheus",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_common//model",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
8 changes: 5 additions & 3 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -56,9 +57,10 @@ var _ exec.Executor = &AnalyzeExec{}
type AnalyzeExec struct {
exec.BaseExecutor
tasks []*analyzeTask
wg util.WaitGroupWrapper
wg *util.WaitGroupPool
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
gp *gp.Pool
}

var (
Expand Down Expand Up @@ -418,7 +420,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := len(subSctxs)

var wg util.WaitGroupWrapper
wg := util.NewWaitGroupPool(e.gp)
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
Expand Down Expand Up @@ -494,7 +496,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
StartAnalyzeJob(e.Ctx(), task.job)
switch task.taskType {
case colTask:
resultsCh <- analyzeColumnsPushDownEntry(task.colExec)
resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec)
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
)

// AnalyzeColumnsExec represents Analyze columns push down executor.
Expand All @@ -61,9 +62,9 @@ type AnalyzeColumnsExec struct {
memTracker *memory.Tracker
}

func analyzeColumnsPushDownEntry(e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
func analyzeColumnsPushDownEntry(gp *gp.Pool, e *AnalyzeColumnsExec) *statistics.AnalyzeResults {
if e.AnalyzeInfo.StatsVersion >= statistics.Version2 {
return e.toV2().analyzeColumnsPushDownWithRetryV2()
return e.toV2().analyzeColumnsPushDownWithRetryV2(gp)
}
return e.toV1().analyzeColumnsPushDownV1()
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -53,8 +54,8 @@ type AnalyzeColumnsExecV2 struct {
*AnalyzeColumnsExec
}

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.AnalyzeResults {
analyzeResult := e.analyzeColumnsPushDownV2()
func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2(gp *gp.Pool) *statistics.AnalyzeResults {
analyzeResult := e.analyzeColumnsPushDownV2(gp)
if e.notRetryable(analyzeResult) {
return analyzeResult
}
Expand Down Expand Up @@ -84,7 +85,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownWithRetryV2() *statistics.A
prepareV2AnalyzeJobInfo(e.AnalyzeColumnsExec, true)
AddNewAnalyzeJob(e.ctx, e.job)
StartAnalyzeJob(e.ctx, e.job)
return e.analyzeColumnsPushDownV2()
return e.analyzeColumnsPushDownV2(gp)
}

// Do **not** retry if succeed / not oom error / not auto-analyze / samplerate not set.
Expand All @@ -94,7 +95,7 @@ func (e *AnalyzeColumnsExecV2) notRetryable(analyzeResult *statistics.AnalyzeRes
e.analyzePB.ColReq == nil || *e.analyzePB.ColReq.SampleRate <= 0
}

func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeResults {
func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2(gp *gp.Pool) *statistics.AnalyzeResults {
var ranges []*ranger.Range
if hc := e.handleCols; hc != nil {
if hc.IsInt() {
Expand Down Expand Up @@ -139,13 +140,13 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
// subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would
// report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test
// case with `-race` flag now.
var wg util.WaitGroupWrapper
wg := util.NewWaitGroupPool(gp)
wg.Run(func() {
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh, statsConcurrncy)
})
defer wg.Wait()

count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(gp, ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
Expand Down Expand Up @@ -243,6 +244,7 @@ func printAnalyzeMergeCollectorLog(oldRootCount, newRootCount, subCount, tableID
}

func (e *AnalyzeColumnsExecV2) buildSamplingStats(
gp *gp.Pool,
ranges []*ranger.Range,
needExtStats bool,
indexesWithVirtualColOffsets []int,
Expand Down Expand Up @@ -290,7 +292,10 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
e.samplingMergeWg = &util.WaitGroupWrapper{}
e.samplingMergeWg.Add(samplingStatsConcurrency)
for i := 0; i < samplingStatsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i)
id := i
gp.Go(func() {
e.subMergeWorker(mergeResultCh, mergeTaskCh, l, id)
})
}
// Merge the result from collectors.
mergeWorkerPanicCnt := 0
Expand Down Expand Up @@ -386,7 +391,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
if totalLen < samplingStatsConcurrency {
samplingStatsConcurrency = totalLen
}
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(buildResultChan)
e.samplingBuilderWg = newNotifyErrorWaitGroupWrapper(gp, buildResultChan)
sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo))
exitCh := make(chan struct{})
e.samplingBuilderWg.Add(samplingStatsConcurrency)
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/util"
"github.com/tiancaiamao/gp"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -102,16 +104,17 @@ func (w *analyzeResultsNotifyWaitGroupWrapper) Run(exec func()) {
// notifyErrorWaitGroupWrapper is a wrapper for sync.WaitGroup
// Please add all goroutine count when to `Add` to avoid exiting in advance.
type notifyErrorWaitGroupWrapper struct {
sync.WaitGroup
*util.WaitGroupPool
notify chan error
cnt atomic.Uint64
}

// newNotifyErrorWaitGroupWrapper is to create notifyErrorWaitGroupWrapper
func newNotifyErrorWaitGroupWrapper(notify chan error) *notifyErrorWaitGroupWrapper {
func newNotifyErrorWaitGroupWrapper(gp *gp.Pool, notify chan error) *notifyErrorWaitGroupWrapper {
return &notifyErrorWaitGroupWrapper{
notify: notify,
cnt: *atomic.NewUint64(0),
WaitGroupPool: util.NewWaitGroupPool(gp),
notify: notify,
cnt: *atomic.NewUint64(0),
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2895,11 +2895,14 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(
}

func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
gp := domain.GetDomain(b.ctx).StatsHandle().GPool()
e := &AnalyzeExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
opts: v.Opts,
OptionsMap: v.OptionsMap,
wg: util.NewWaitGroupPool(gp),
gp: gp,
}
autoAnalyze := ""
if b.ctx.GetSessionVars().InRestrictedSQL {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//oracle",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/topsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
],
embed = [":topsql"],
flaky = True,
shard_count = 5,
deps = [
"//pkg/config",
"//pkg/parser",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/topsql/reporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
],
embed = [":reporter"],
flaky = True,
shard_count = 36,
deps = [
"//pkg/config",
"//pkg/testkit/testsetup",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/topsql/stmtstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
],
embed = [":stmtstats"],
flaky = True,
shard_count = 11,
deps = [
"//pkg/testkit/testsetup",
"//pkg/util/topsql/state",
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/wait_group_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -172,3 +173,27 @@ func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r interfac
exec()
}()
}

// WaitGroupPool is a wrapper for sync.WaitGroup and support goroutine pool
type WaitGroupPool struct {
sync.WaitGroup
gp *gp.Pool
}

// NewWaitGroupPool returns WaitGroupPool
func NewWaitGroupPool(gp *gp.Pool) *WaitGroupPool {
var wg WaitGroupPool
wg.gp = gp
return &wg
}

// Run runs a function in a goroutine, adds 1 to WaitGroup
// and calls done when function returns. Please DO NOT use panic
// in the cb function.
func (w *WaitGroupPool) Run(exec func()) {
w.Add(1)
w.gp.Go(func() {
defer w.Done()
exec()
})
}
1 change: 1 addition & 0 deletions tools/tazel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ func skipShardCount(path string) bool {
(strings.HasPrefix(path, "pkg/util") &&
!strings.HasPrefix(path, "pkg/util/admin") &&
!strings.HasPrefix(path, "pkg/util/chunk") &&
!strings.HasPrefix(path, "pkg/util/topsql") &&
!strings.HasPrefix(path, "pkg/util/stmtsummary"))
}

0 comments on commit 5a22d56

Please sign in to comment.