From 063ac13685bb2551774ffb2315f5a510024ca8a5 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sun, 31 Mar 2024 00:40:45 +0800 Subject: [PATCH] This is an automated cherry-pick of #52226 Signed-off-by: ti-chi-bot --- pkg/statistics/handle/bootstrap.go | 238 ++++++++++++++++-- pkg/statistics/handle/initstats/BUILD.bazel | 6 +- .../handle/initstats/load_stats_page.go | 74 ++++++ 3 files changed, 302 insertions(+), 16 deletions(-) create mode 100644 pkg/statistics/handle/initstats/load_stats_page.go diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index f6c58c025e5e6..424bffabe1a29 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -16,6 +16,8 @@ package handle import ( "context" + "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -25,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/cache" @@ -36,9 +39,22 @@ import ( "go.uber.org/zap" ) +<<<<<<< HEAD func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) { +======= +var maxTidRecord MaxTidRecord + +// MaxTidRecord is to record the max tid. +type MaxTidRecord struct { + mu sync.Mutex + tid atomic.Int64 +} + +func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { + var physicalID int64 +>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226)) for row := iter.Begin(); row != iter.End(); row = iter.Next() { - physicalID := row.GetInt64(1) + physicalID = row.GetInt64(1) // The table is read-only. Please do not modify it. table, ok := h.TableInfoByID(is, physicalID) if !ok { @@ -61,6 +77,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsC } cache.Put(physicalID, tbl) // put this table again since it is updated } + maxTidRecord.mu.Lock() + defer maxTidRecord.mu.Unlock() + if maxTidRecord.tid.Load() < physicalID { + maxTidRecord.tid.Store(physicalID) + } } func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error) { @@ -244,12 +265,6 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache util.St return errors.Trace(err) } defer terror.Call(rc.Close) - if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite) - ls.LoadStats(is, cache, rc) - ls.Wait() - return nil - } ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) @@ -273,12 +288,6 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC return errors.Trace(err) } defer terror.Call(rc.Close) - if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk) - ls.LoadStats(is, cache, rc) - ls.Wait() - return nil - } ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) @@ -295,7 +304,66 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC return nil } +<<<<<<< HEAD func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) { +======= +func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task) error { + se, err := h.Pool.SPool().Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + h.Pool.SPool().Put(se) + } + }() + sctx := se.(sessionctx.Context) + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %?" + rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsHistograms4Chunk(is, cache, iter) + } + return nil +} + +func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error { + var maxTid = maxTidRecord.tid.Load() + tid := int64(0) + ls := initstats.NewRangeWorker(func(task initstats.Task) error { + return h.initStatsHistogramsByPaging(is, cache, task) + }) + ls.LoadStats() + for tid <= maxTid { + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + 1000, + }) + tid += 1000 + } + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + 1000, + }) + ls.Wait() + return nil +} + +func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { +>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226)) affectedIndexes := make(map[*statistics.Index]struct{}) for row := iter.Begin(); row != iter.End(); row = iter.Next() { table, ok := cache.Get(row.GetInt64(0)) @@ -328,6 +396,7 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error { return errors.Trace(err) } defer terror.Call(rc.Close) +<<<<<<< HEAD if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) { h.initStatsTopN4Chunk(cache, iter) @@ -336,6 +405,8 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error { ls.Wait() return nil } +======= +>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226)) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) @@ -352,7 +423,66 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error { return nil } +<<<<<<< HEAD func (*Handle) initStatsFMSketch4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) { +======= +func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task) error { + se, err := h.Pool.SPool().Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + h.Pool.SPool().Put(se) + } + }() + sctx := se.(sessionctx.Context) + sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %?" + rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsTopN4Chunk(cache, iter) + } + return nil +} + +func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error { + var maxTid = maxTidRecord.tid.Load() + tid := int64(0) + ls := initstats.NewRangeWorker(func(task initstats.Task) error { + return h.initStatsTopNByPaging(cache, task) + }) + ls.LoadStats() + for tid <= maxTid { + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + 1000, + }) + tid += 1000 + } + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + 1000, + }) + ls.Wait() + return nil +} + +func (*Handle) initStatsFMSketch4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) { +>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226)) for row := iter.Begin(); row != iter.End(); row = iter.Next() { table, ok := cache.Get(row.GetInt64(0)) if !ok { @@ -454,6 +584,7 @@ func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterato } } +<<<<<<< HEAD func (h *Handle) initStatsBuckets(cache util.StatsCache) error { sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" rc, err := util.Exec(h.initStatsCtx, sql) @@ -467,7 +598,21 @@ func (h *Handle) initStatsBuckets(cache util.StatsCache) error { }) ls.LoadStats(nil, cache, rc) ls.Wait() +======= +func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error { + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + err := h.initStatsBucketsConcurrency(cache) + if err != nil { + return errors.Trace(err) + } +>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226)) } else { + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" + rc, err := util.Exec(h.initStatsCtx, sql) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) req := rc.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) @@ -501,6 +646,61 @@ func (h *Handle) initStatsBuckets(cache util.StatsCache) error { return nil } +func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task initstats.Task) error { + se, err := h.Pool.SPool().Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + h.Pool.SPool().Put(se) + } + }() + sctx := se.(sessionctx.Context) + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %?" + rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid) + if err != nil { + return errors.Trace(err) + } + defer terror.Call(rc.Close) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + req := rc.NewChunk(nil) + iter := chunk.NewIterator4Chunk(req) + for { + err := rc.Next(ctx, req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsBuckets4Chunk(cache, iter) + } + return nil +} + +func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error { + var maxTid = maxTidRecord.tid.Load() + tid := int64(0) + ls := initstats.NewRangeWorker(func(task initstats.Task) error { + return h.initStatsBucketsByPaging(cache, task) + }) + ls.LoadStats() + for tid <= maxTid { + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + 1000, + }) + tid += 1000 + } + ls.SendTask(initstats.Task{ + StartTid: tid, + EndTid: tid + 1000, + }) + ls.Wait() + return nil +} + // InitStatsLite initiates the stats cache. The function is liter and faster than InitStats. // Column/index stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN. func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) { @@ -545,11 +745,19 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { if err != nil { return errors.Trace(err) } - err = h.initStatsHistograms(is, cache) + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + err = h.initStatsHistogramsConcurrency(is, cache) + } else { + err = h.initStatsHistograms(is, cache) + } if err != nil { return errors.Trace(err) } - err = h.initStatsTopN(cache) + if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { + err = h.initStatsTopNConcurrency(cache) + } else { + err = h.initStatsTopN(cache) + } if err != nil { return err } diff --git a/pkg/statistics/handle/initstats/BUILD.bazel b/pkg/statistics/handle/initstats/BUILD.bazel index 897e408f8290a..d987fd88a3df3 100644 --- a/pkg/statistics/handle/initstats/BUILD.bazel +++ b/pkg/statistics/handle/initstats/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "initstats", - srcs = ["load_stats.go"], + srcs = [ + "load_stats.go", + "load_stats_page.go", + ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats", visibility = ["//visibility:public"], deps = [ @@ -12,6 +15,7 @@ go_library( "//pkg/statistics/handle/util", "//pkg/util", "//pkg/util/chunk", + "//pkg/util/logutil", "//pkg/util/sqlexec", "@org_uber_go_zap//:zap", ], diff --git a/pkg/statistics/handle/initstats/load_stats_page.go b/pkg/statistics/handle/initstats/load_stats_page.go new file mode 100644 index 0000000000000..24ec035adcfe3 --- /dev/null +++ b/pkg/statistics/handle/initstats/load_stats_page.go @@ -0,0 +1,74 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package initstats + +import ( + "runtime" + + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "go.uber.org/zap" +) + +// Task represents the range of the table for loading stats. +type Task struct { + StartTid int64 + EndTid int64 +} + +// RangeWorker is used to load stats concurrently by the range of table id. +type RangeWorker struct { + dealFunc func(task Task) error + taskChan chan Task + + wg util.WaitGroupWrapper +} + +// NewRangeWorker creates a new RangeWorker. +func NewRangeWorker(dealFunc func(task Task) error) *RangeWorker { + return &RangeWorker{ + dealFunc: dealFunc, + taskChan: make(chan Task, 1), + } +} + +// LoadStats loads stats concurrently when to init stats +func (ls *RangeWorker) LoadStats() { + concurrency := runtime.GOMAXPROCS(0) + for n := 0; n < concurrency; n++ { + ls.wg.Run(func() { + ls.loadStats() + }) + } +} + +func (ls *RangeWorker) loadStats() { + for task := range ls.taskChan { + if err := ls.dealFunc(task); err != nil { + logutil.BgLogger().Error("load stats failed", zap.Error(err)) + } + } +} + +// SendTask sends a task to the load stats worker. +func (ls *RangeWorker) SendTask(task Task) { + ls.taskChan <- task +} + +// Wait closes the load stats worker. +func (ls *RangeWorker) Wait() { + close(ls.taskChan) + ls.wg.Wait() +}