Skip to content

Commit

Permalink
executor: add some comments and fix typos (#47005)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Sep 15, 2023
1 parent 4c1f28b commit 95a35f6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 23 deletions.
21 changes: 14 additions & 7 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})

// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
err = e.handleGlobalStats(ctx, needGlobalStats, globalStatsMap)
if err != nil {
return err
if needGlobalStats {
err = e.handleGlobalStats(ctx, globalStatsMap)
if err != nil {
return err
}
}

// Update analyze options to mysql.analyze_options for auto analyze.
Expand Down Expand Up @@ -310,10 +311,15 @@ func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error {
}

// handleResultsError will handle the error fetch from resultsCh and record it in log
func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
func (e *AnalyzeExec) handleResultsError(
ctx context.Context,
concurrency int,
needGlobalStats bool,
globalStatsMap globalStatsMap,
resultsCh <-chan *statistics.AnalyzeResults,
) error {
partitionStatsConcurrency := e.Ctx().GetSessionVars().AnalyzePartitionConcurrency
// If 'partitionStatsConcurrency' > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If there is no extra session we can use, we will save analyze results in single-thread.
if partitionStatsConcurrency > 1 {
dom := domain.GetDomain(e.Ctx())
Expand Down Expand Up @@ -548,6 +554,7 @@ func FinishAnalyzeMergeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob,
if job == nil || job.ID == nil {
return
}

job.EndTime = time.Now()
var sql string
var args []interface{}
Expand Down
38 changes: 31 additions & 7 deletions executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
e.handleNDVForSpecialIndexes(specialIndexes, idxNDVPushDownCh)
})
defer wg.Wait()

count, hists, topNs, fmSketches, extStats, err := e.buildSamplingStats(ranges, collExtStats, specialIndexesOffsets, idxNDVPushDownCh)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
Expand All @@ -159,6 +160,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes
TopNs: topNs[:cLen],
Fms: fmSketches[:cLen],
}

return &statistics.AnalyzeResults{
TableID: e.tableID,
Ars: []*statistics.AnalyzeResult{colResult, colGroupResult},
Expand Down Expand Up @@ -295,6 +297,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}
oldRootCollectorSize := rootRowCollector.Base().MemSize
oldRootCollectorCount := rootRowCollector.Base().Count
// Merge the result from sub-collectors.
rootRowCollector.MergeCollector(mergeResult.collector)
newRootCollectorCount := rootRowCollector.Base().Count
printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount,
Expand All @@ -307,7 +310,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
return 0, nil, nil, nil, nil, err
}

// Handling virtual columns.
// Decode the data from sample collectors.
virtualColIdx := buildVirtualColumnIndex(e.schemaForVirtualColEval, e.colsInfo)
if len(virtualColIdx) > 0 {
fieldTps := make([]*types.FieldType, 0, len(virtualColIdx))
Expand All @@ -330,16 +333,14 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
}
}

// Calculate handle from the row data for each row. It will be used to sort the samples.
for _, sample := range rootRowCollector.Base().Samples {
// Calculate handle from the row data for each row. It will be used to sort the samples.
sample.Handle, err = e.handleCols.BuildHandleByDatums(sample.Columns)
if err != nil {
return 0, nil, nil, nil, nil, err
}
}

colLen := len(e.colsInfo)

// The order of the samples are broken when merging samples from sub-collectors.
// So now we need to sort the samples according to the handle in order to calculate correlation.
sort.Slice(rootRowCollector.Base().Samples, func(i, j int) bool {
Expand All @@ -359,11 +360,14 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
sampleCollectors := make([]*statistics.SampleCollector, len(e.colsInfo))
exitCh := make(chan struct{})
e.samplingBuilderWg.Add(statsConcurrency)

// Start workers to build stats.
for i := 0; i < statsConcurrency; i++ {
e.samplingBuilderWg.Run(func() {
e.subBuildWorker(buildResultChan, buildTaskChan, hists, topns, sampleCollectors, exitCh)
})
}
// Generate tasks for building stats.
for i, col := range e.colsInfo {
buildTaskChan <- &samplingBuildTask{
id: col.ID,
Expand All @@ -387,7 +391,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
}

// build index stats
// Generate tasks for building stats for indexes.
for i, idx := range e.indexes {
buildTaskChan <- &samplingBuildTask{
id: idx.ID,
Expand All @@ -399,6 +403,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[colLen+i])
}
close(buildTaskChan)

panicCnt := 0
for panicCnt < statsConcurrency {
err1, ok := <-buildResultChan
Expand All @@ -425,6 +430,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
if err != nil {
return 0, nil, nil, nil, nil, err
}

count = rootRowCollector.Base().Count
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
Expand All @@ -433,6 +439,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(
return 0, nil, nil, nil, nil, err
}
}

return
}

Expand Down Expand Up @@ -589,7 +596,8 @@ func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*mod
}

func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResult, taskCh <-chan []byte, l int, index int) {
isClosedChanThread := index == 0
// Only close the resultCh in the first worker.
closeTheResultCh := index == 0
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -604,7 +612,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
}
}
e.samplingMergeWg.Done()
if isClosedChanThread {
if closeTheResultCh {
e.samplingMergeWg.Wait()
close(resultCh)
}
Expand All @@ -628,30 +636,40 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
if !ok {
break
}

// Unmarshal the data.
dataSize := int64(cap(data))
colResp := &tipb.AnalyzeColumnsResp{}
err := colResp.Unmarshal(data)
if err != nil {
resultCh <- &samplingMergeResult{err: err}
return
}
// Consume the memory of the data.
colRespSize := int64(colResp.Size())
e.memTracker.Consume(colRespSize)

// Update processed rows.
subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
subCollector.Base().FromProto(colResp.RowCollector, e.memTracker)
UpdateAnalyzeJob(e.ctx, e.job, subCollector.Base().Count)

// Print collect log.
oldRetCollectorSize := retCollector.Base().MemSize
oldRetCollectorCount := retCollector.Base().Count
retCollector.MergeCollector(subCollector)
newRetCollectorCount := retCollector.Base().Count
printAnalyzeMergeCollectorLog(oldRetCollectorCount, newRetCollectorCount, subCollector.Base().Count,
e.tableID.TableID, e.tableID.PartitionID, e.TableID.IsPartitionTable(),
"merge subCollector in concurrency in AnalyzeColumnsExecV2", index)

// Consume the memory of the result.
newRetCollectorSize := retCollector.Base().MemSize
subCollectorSize := subCollector.Base().MemSize
e.memTracker.Consume(newRetCollectorSize - oldRetCollectorSize - subCollectorSize)
e.memTracker.Release(dataSize + colRespSize)
}

resultCh <- &samplingMergeResult{collector: retCollector}
}

Expand All @@ -666,11 +684,13 @@ func (e *AnalyzeColumnsExecV2) subBuildWorker(resultCh chan error, taskCh chan *
failpoint.Inject("mockAnalyzeSamplingBuildWorkerPanic", func() {
panic("failpoint triggered")
})

colLen := len(e.colsInfo)
bufferedMemSize := int64(0)
bufferedReleaseSize := int64(0)
defer e.memTracker.Consume(bufferedMemSize)
defer e.memTracker.Release(bufferedReleaseSize)

workLoop:
for {
select {
Expand Down Expand Up @@ -829,6 +849,7 @@ type samplingBuildTask struct {
}

func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, mergeTaskCh chan []byte, memTracker *memory.Tracker) error {
// After all tasks are sent, close the mergeTaskCh to notify the mergeWorker that all tasks have been sent.
defer close(mergeTaskCh)
for {
failpoint.Inject("mockKillRunningV2AnalyzeJob", func() {
Expand All @@ -841,15 +862,18 @@ func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, me
failpoint.Inject("mockSlowAnalyzeV2", func() {
time.Sleep(1000 * time.Second)
})

data, err := handler.nextRaw(context.TODO())
if err != nil {
return errors.Trace(err)
}
if data == nil {
break
}

memTracker.Consume(int64(cap(data)))
mergeTaskCh <- data
}

return nil
}
29 changes: 20 additions & 9 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,47 @@ type globalStatsInfo struct {
// The meaning of value in map is some additional information needed to build global-level stats.
type globalStatsMap map[globalStatsKey]globalStatsInfo

func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats bool, globalStatsMap globalStatsMap) error {
if !needGlobalStats {
return nil
}
globalStatsTableIDs := make(map[int64]struct{})
func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap globalStatsMap) error {
globalStatsTableIDs := make(map[int64]struct{}, len(globalStatsMap))
for globalStatsID := range globalStatsMap {
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
tableIDs := map[int64]struct{}{}
tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs))

for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
tableAllPartitionStats := make(map[int64]*statistics.Table)

for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
continue
}

job := e.newAnalyzeHandleGlobalStatsJob(globalStatsID)
if job == nil {
logutil.BgLogger().Warn("cannot find the partitioned table, skip merging global stats", zap.Int64("tableID", globalStatsID.tableID))
continue
}
AddNewAnalyzeJob(e.Ctx(), job)
StartAnalyzeJob(e.Ctx(), job)

mergeStatsErr := func() error {
globalOpts := e.opts
if e.OptionsMap != nil {
if v2Options, ok := e.OptionsMap[globalStatsID.tableID]; ok {
globalOpts = v2Options.FilledOpts
}
}
globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.Ctx(), globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID, info.isIndex, info.histIDs,
tableAllPartitionStats)

globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
e.Ctx(),
globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID,
info.isIndex, info.histIDs,
tableAllPartitionStats,
)
if err != nil {
logutil.BgLogger().Warn("merge global stats failed",
zap.String("info", job.JobInfo), zap.Error(err), zap.Int64("tableID", tableID))
Expand All @@ -88,6 +95,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
}
return err
}
// Dump global-level stats to kv.
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
if hg == nil {
Expand All @@ -114,15 +122,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
}
return err
}()

FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr)
}
}

for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.Ctx(), tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}

return nil
}

Expand Down

0 comments on commit 95a35f6

Please sign in to comment.