Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52226
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hawkingrei authored and ti-chi-bot committed Apr 1, 2024
1 parent 669748e commit 063ac13
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 16 deletions.
238 changes: 223 additions & 15 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package handle

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
Expand All @@ -36,9 +39,22 @@ import (
"go.uber.org/zap"
)

<<<<<<< HEAD
func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
=======
var maxTidRecord MaxTidRecord

// MaxTidRecord is to record the max tid.
type MaxTidRecord struct {
mu sync.Mutex
tid atomic.Int64
}

func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
var physicalID int64
>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226))
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
physicalID = row.GetInt64(1)
// The table is read-only. Please do not modify it.
table, ok := h.TableInfoByID(is, physicalID)
if !ok {
Expand All @@ -61,6 +77,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache util.StatsC
}
cache.Put(physicalID, tbl) // put this table again since it is updated
}
maxTidRecord.mu.Lock()
defer maxTidRecord.mu.Unlock()
if maxTidRecord.tid.Load() < physicalID {
maxTidRecord.tid.Store(physicalID)
}
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error) {
Expand Down Expand Up @@ -244,12 +265,6 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache util.St
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -273,12 +288,6 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -295,7 +304,66 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache util.StatsC
return nil
}

<<<<<<< HEAD
func (*Handle) initStatsTopN4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
=======
func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.Pool.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
}
return nil
}

func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
tid += 1000
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
ls.Wait()
return nil
}

func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226))
affectedIndexes := make(map[*statistics.Index]struct{})
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.Get(row.GetInt64(0))
Expand Down Expand Up @@ -328,6 +396,7 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
<<<<<<< HEAD
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache util.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsTopN4Chunk(cache, iter)
Expand All @@ -336,6 +405,8 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error {
ls.Wait()
return nil
}
=======
>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226))
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -352,7 +423,66 @@ func (h *Handle) initStatsTopN(cache util.StatsCache) error {
return nil
}

<<<<<<< HEAD
func (*Handle) initStatsFMSketch4Chunk(cache util.StatsCache, iter *chunk.Iterator4Chunk) {
=======
func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.Pool.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
}
return nil
}

func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsTopNByPaging(cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
tid += 1000
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
ls.Wait()
return nil
}

func (*Handle) initStatsFMSketch4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226))
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.Get(row.GetInt64(0))
if !ok {
Expand Down Expand Up @@ -454,6 +584,7 @@ func (*Handle) initStatsBuckets4Chunk(cache util.StatsCache, iter *chunk.Iterato
}
}

<<<<<<< HEAD
func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
Expand All @@ -467,7 +598,21 @@ func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
=======
func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err := h.initStatsBucketsConcurrency(cache)
if err != nil {
return errors.Trace(err)
}
>>>>>>> 150233ddf31 (*: fix panic when to enable lite-init-stats and concurrently-init-stats (#52226))
} else {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand Down Expand Up @@ -501,6 +646,61 @@ func (h *Handle) initStatsBuckets(cache util.StatsCache) error {
return nil
}

func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task initstats.Task) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.Pool.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsBuckets4Chunk(cache, iter)
}
return nil
}

func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsBucketsByPaging(cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
tid += 1000
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
ls.Wait()
return nil
}

// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats.
// Column/index stats are not loaded, i.e., we only load scalars such as NDV, NullCount, Correlation and don't load CMSketch/Histogram/TopN.
func (h *Handle) InitStatsLite(is infoschema.InfoSchema) (err error) {
Expand Down Expand Up @@ -545,11 +745,19 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return errors.Trace(err)
}
err = h.initStatsHistograms(is, cache)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache)
} else {
err = h.initStatsHistograms(is, cache)
}
if err != nil {
return errors.Trace(err)
}
err = h.initStatsTopN(cache)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsTopNConcurrency(cache)
} else {
err = h.initStatsTopN(cache)
}
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/statistics/handle/initstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "initstats",
srcs = ["load_stats.go"],
srcs = [
"load_stats.go",
"load_stats_page.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -12,6 +15,7 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
Expand Down
Loading

0 comments on commit 063ac13

Please sign in to comment.