Skip to content

Commit

Permalink
statistics: use goroutine pool to improve performance (#46266)
Browse files Browse the repository at this point in the history
close #46267
  • Loading branch information
hawkingrei authored Aug 21, 2023
1 parent 94cfa8b commit 9411e82
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
1 change: 1 addition & 0 deletions statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tiancaiamao_gp//:gp",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
Expand Down
7 changes: 5 additions & 2 deletions statistics/cmsketch_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tiancaiamao/gp"
)

// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/statistics
Expand Down Expand Up @@ -131,15 +132,17 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
} else if batchSize > handle.MaxPartitionMergeBatchSize {
batchSize = handle.MaxPartitionMergeBatchSize
}
gpool := gp.New(mergeConcurrency, 5*time.Minute)
defer gpool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = handle.MergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
_, _, _, _ = handle.MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
}
}

var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000}
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000, 1000000, 10000000, 100000000}
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000}

func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
for _, size := range benchmarkSizes {
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tiancaiamao_gp//:gp",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
22 changes: 15 additions & 7 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"slices"
"strconv"
"strings"
Expand All @@ -41,12 +42,12 @@ import (
handle_metrics "github.com/pingcap/tidb/statistics/handle/metrics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/syncutil"
"github.com/tiancaiamao/gp"
"github.com/tikv/client-go/v2/oracle"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -62,6 +63,9 @@ const (

// Handle can update stats info periodically.
type Handle struct {
// this gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
gpool *gp.Pool

pool sessionPool

// initStatsCtx is the ctx only used for initStats
Expand Down Expand Up @@ -483,6 +487,7 @@ type sessionPool interface {
func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, autoAnalyzeProcIDGetter func() uint64) (*Handle, error) {
cfg := config.GetGlobalConfig()
handle := &Handle{
gpool: gp.New(math.MaxInt16, time.Minute),
ddlEventCh: make(chan *ddlUtil.Event, 1000),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)},
Expand Down Expand Up @@ -857,7 +862,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(h.gpool, sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
if err != nil {
return
}
Expand Down Expand Up @@ -889,7 +894,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
return
}

func mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency
Expand All @@ -904,14 +909,14 @@ func mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrappe
} else if batchSize > MaxPartitionMergeBatchSize {
batchSize = MaxPartitionMergeBatchSize
}
return MergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed)
return MergeGlobalStatsTopNByConcurrency(gp, mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed)
}

// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool, killed *uint32) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
Expand All @@ -927,13 +932,15 @@ func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wra
tasks = append(tasks, task)
start = end
}
var wg util.WaitGroupWrapper
var wg sync.WaitGroup
taskNum := len(tasks)
taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum)
respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum)
for i := 0; i < mergeConcurrency; i++ {
worker := statistics.NewTopnStatsMergeWorker(taskCh, respCh, wrapper, killed)
wg.Run(func() {
wg.Add(1)
gp.Go(func() {
defer wg.Done()
worker.Run(timeZone, isIndex, n, version)
})
}
Expand Down Expand Up @@ -2322,4 +2329,5 @@ func (h *Handle) SetStatsCacheCapacity(c int64) {
// Close stops the background
func (h *Handle) Close() {
h.statsCache.Load().Close()
h.gpool.Close()
}

0 comments on commit 9411e82

Please sign in to comment.