Skip to content

Commit

Permalink
statistics: concurrency init stats (#51403)
Browse files Browse the repository at this point in the history
close #52102
  • Loading branch information
hawkingrei authored Mar 28, 2024
1 parent 9855a55 commit ce2cf92
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 20 deletions.
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ type Performance struct {
// If ForceInitStats is false, tidb can provide service before init stats is finished. Note that during the period
// of init stats the optimizer may make bad decisions due to pseudo stats.
ForceInitStats bool `toml:"force-init-stats" json:"force-init-stats"`

// ConcurrentlyInitStats indicates whether to use concurrency to init stats.
ConcurrentlyInitStats bool `toml:"concurrently-init-stats" json:"concurrently-init-stats"`
}

// PlanCache is the PlanCache section of the config.
Expand Down Expand Up @@ -1016,6 +1019,7 @@ var defaultConf = Config{
EnableLoadFMSketch: false,
LiteInitStats: true,
ForceInitStats: true,
ConcurrentlyInitStats: true,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/statistics/handle/ddl",
"//pkg/statistics/handle/globalstats",
"//pkg/statistics/handle/history",
"//pkg/statistics/handle/initstats",
"//pkg/statistics/handle/lockstats",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/storage",
Expand Down
63 changes: 49 additions & 14 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
"github.com/pingcap/tidb/pkg/statistics/handle/initstats"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -77,6 +78,12 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statstypes.StatsCache,
if err != nil {
return nil, err
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsMeta4Chunk)
ls.LoadStats(is, tables, rc)
ls.Wait()
return tables, nil
}
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -229,13 +236,19 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache stats
}

func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand All @@ -252,13 +265,19 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statsty
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -301,13 +320,21 @@ func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iter
}

func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsTopN4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
Expand Down Expand Up @@ -425,24 +452,32 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
}

func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsBuckets4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
} else {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsBuckets4Chunk(cache, iter)
}
h.initStatsBuckets4Chunk(cache, iter)
}
tables := cache.Values()
for _, table := range tables {
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/handletest/statstest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 9,
shard_count = 12,
deps = [
"//pkg/config",
"//pkg/parser/model",
Expand Down
53 changes: 48 additions & 5 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,38 @@ func testInitStatsMemTrace(t *testing.T) {
}

func TestInitStatsMemTraceWithLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = false
})
testInitStatsMemTraceFunc(t, true)
}

func TestInitStatsMemTraceWithoutLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = false
})
testInitStatsMemTraceFunc(t, false)
}

func TestInitStatsMemTraceWithConcurrrencyLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = true
})
testInitStatsMemTraceFunc(t, true)
}

func TestInitStatsMemTraceWithoutConcurrrencyLite(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.Performance.ConcurrentlyInitStats = true
})
testInitStatsMemTraceFunc(t, false)
}

Expand Down Expand Up @@ -283,11 +311,26 @@ func TestInitStats51358(t *testing.T) {
}

func TestInitStatsVer2(t *testing.T) {
originValue := config.GetGlobalConfig().Performance.LiteInitStats
defer func() {
config.GetGlobalConfig().Performance.LiteInitStats = originValue
}()
config.GetGlobalConfig().Performance.LiteInitStats = false
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = false
})
initStatsVer2(t)
}

func TestInitStatsVer2Concurrency(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
config.GetGlobalConfig().Performance.LiteInitStats = false
config.GetGlobalConfig().Performance.ConcurrentlyInitStats = true
})
initStatsVer2(t)
}

func initStatsVer2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
18 changes: 18 additions & 0 deletions pkg/statistics/handle/initstats/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "initstats",
srcs = ["load_stats.go"],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/kv",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
)
86 changes: 86 additions & 0 deletions pkg/statistics/handle/initstats/load_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package initstats

import (
"context"
"runtime"
"sync"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

// Worker is used to load stats concurrently.
type Worker struct {
taskFunc func(ctx context.Context, req *chunk.Chunk) error
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)
mu sync.Mutex
wg util.WaitGroupWrapper
}

// NewWorker creates a new Worker.
func NewWorker(
taskFunc func(ctx context.Context, req *chunk.Chunk) error,
dealFunc func(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk)) *Worker {
return &Worker{
taskFunc: taskFunc,
dealFunc: dealFunc,
}
}

// LoadStats loads stats concurrently when to init stats
func (ls *Worker) LoadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, rc sqlexec.RecordSet) {
concurrency := runtime.GOMAXPROCS(0)
for n := 0; n < concurrency; n++ {
ls.wg.Run(func() {
req := rc.NewChunk(nil)
ls.loadStats(is, cache, req)
})
}
}

func (ls *Worker) loadStats(is infoschema.InfoSchema, cache statstypes.StatsCache, req *chunk.Chunk) {
iter := chunk.NewIterator4Chunk(req)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
for {
err := ls.getTask(ctx, req)
if err != nil {
logutil.StatsLogger().Error("load stats failed", zap.Error(err))
return
}
if req.NumRows() == 0 {
return
}
ls.dealFunc(is, cache, iter)
}
}

func (ls *Worker) getTask(ctx context.Context, req *chunk.Chunk) error {
ls.mu.Lock()
defer ls.mu.Unlock()
return ls.taskFunc(ctx, req)
}

// Wait closes the load stats worker.
func (ls *Worker) Wait() {
ls.wg.Wait()
}

0 comments on commit ce2cf92

Please sign in to comment.