Skip to content

Commit

Permalink
*: fix panic when to enable lite-init-stats and concurrently-init-sta…
Browse files Browse the repository at this point in the history
…ts (#52226)

close #52223
  • Loading branch information
hawkingrei authored Mar 30, 2024
1 parent 2cc964d commit 150233d
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 33 deletions.
233 changes: 201 additions & 32 deletions pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package handle

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
Expand All @@ -37,9 +40,18 @@ import (
"go.uber.org/zap"
)

var maxTidRecord MaxTidRecord

// MaxTidRecord is to record the max tid.
type MaxTidRecord struct {
mu sync.Mutex
tid atomic.Int64
}

func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
var physicalID int64
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
physicalID = row.GetInt64(1)
// The table is read-only. Please do not modify it.
table, ok := h.TableInfoByID(is, physicalID)
if !ok {
Expand All @@ -64,6 +76,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache statstypes.
}
cache.Put(physicalID, tbl) // put this table again since it is updated
}
maxTidRecord.mu.Lock()
defer maxTidRecord.mu.Unlock()
if maxTidRecord.tid.Load() < physicalID {
maxTidRecord.tid.Store(physicalID)
}
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statstypes.StatsCache, error) {
Expand Down Expand Up @@ -242,12 +259,6 @@ func (h *Handle) initStatsHistogramsLite(is infoschema.InfoSchema, cache statsty
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4ChunkLite)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -271,12 +282,39 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, h.initStatsHistograms4Chunk)
ls.LoadStats(is, cache, rc)
ls.Wait()
return nil
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter)
}
return nil
}

func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.Pool.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -293,6 +331,28 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.
return nil
}

func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
tid += 1000
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
ls.Wait()
return nil
}

func (*Handle) initStatsTopN4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
affectedIndexes := make(map[*statistics.Index]struct{})
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
Expand Down Expand Up @@ -326,14 +386,39 @@ func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsTopN4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
return nil
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
}
return nil
}

func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.Pool.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1 and table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand All @@ -350,6 +435,28 @@ func (h *Handle) initStatsTopN(cache statstypes.StatsCache) error {
return nil
}

func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsTopNByPaging(cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
tid += 1000
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
ls.Wait()
return nil
}

func (*Handle) initStatsFMSketch4Chunk(cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.Get(row.GetInt64(0))
Expand Down Expand Up @@ -452,19 +559,18 @@ func (*Handle) initStatsBuckets4Chunk(cache statstypes.StatsCache, iter *chunk.I
}

func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
ls := initstats.NewWorker(rc.Next, func(_ infoschema.InfoSchema, cache statstypes.StatsCache, iter *chunk.Iterator4Chunk) {
h.initStatsBuckets4Chunk(cache, iter)
})
ls.LoadStats(nil, cache, rc)
ls.Wait()
err := h.initStatsBucketsConcurrency(cache)
if err != nil {
return errors.Trace(err)
}
} else {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
Expand Down Expand Up @@ -498,6 +604,61 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache) error {
return nil
}

func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task initstats.Task) error {
se, err := h.Pool.SPool().Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.Pool.SPool().Put(se)
}
}()
sctx := se.(sessionctx.Context)
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id >= %? and table_id < %?"
rc, err := util.Exec(sctx, sql, task.StartTid, task.EndTid)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsBuckets4Chunk(cache, iter)
}
return nil
}

func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker(func(task initstats.Task) error {
return h.initStatsBucketsByPaging(cache, task)
})
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
tid += 1000
}
ls.SendTask(initstats.Task{
StartTid: tid,
EndTid: tid + 1000,
})
ls.Wait()
return nil
}

// InitStatsLite initiates the stats cache. The function is liter and faster than InitStats.
// 1. Basic stats meta data is loaded.(count, modify count, etc.)
// 2. Column/index stats are loaded. (only histogram)
Expand Down Expand Up @@ -544,11 +705,19 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
if err != nil {
return errors.Trace(err)
}
err = h.initStatsHistograms(is, cache)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache)
} else {
err = h.initStatsHistograms(is, cache)
}
if err != nil {
return errors.Trace(err)
}
err = h.initStatsTopN(cache)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsTopNConcurrency(cache)
} else {
err = h.initStatsTopN(cache)
}
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/statistics/handle/initstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "initstats",
srcs = ["load_stats.go"],
srcs = [
"load_stats.go",
"load_stats_page.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/initstats",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -12,6 +15,7 @@ go_library(
"//pkg/statistics/handle/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
Expand Down
Loading

0 comments on commit 150233d

Please sign in to comment.