diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 32cfc8b085fb3..42104652fdef0 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -165,6 +165,7 @@ go_library( "//pkg/statistics", "//pkg/statistics/handle", "//pkg/statistics/handle/cache", + "//pkg/statistics/handle/globalstats", "//pkg/statistics/handle/util", "//pkg/store/driver/backoff", "//pkg/store/driver/txn", diff --git a/pkg/executor/analyze_col.go b/pkg/executor/analyze_col.go index 5946401ca7aa0..7d8cdc7994ef7 100644 --- a/pkg/executor/analyze_col.go +++ b/pkg/executor/analyze_col.go @@ -280,8 +280,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo fms = append(fms, collectors[i].FMSketch) } if needExtStats { - statsHandle := domain.GetDomain(e.ctx).StatsHandle() - extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, collectors) + extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, collectors) if err != nil { return nil, nil, nil, nil, nil, err } diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 9597a384049e1..d46360e3d725d 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -465,8 +465,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( count = rootRowCollector.Base().Count if needExtStats { - statsHandle := domain.GetDomain(e.ctx).StatsHandle() - extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors) + extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors) if err != nil { return 0, nil, nil, nil, nil, err } diff --git a/pkg/executor/analyze_global_stats.go b/pkg/executor/analyze_global_stats.go index a2ea6f7f01523..3aed3bb4f986a 100644 --- a/pkg/executor/analyze_global_stats.go +++ b/pkg/executor/analyze_global_stats.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/globalstats" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/logutil" @@ -84,7 +85,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob cache = nil } - globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID( + globalStatsI, err := statsHandle.MergePartitionStats2GlobalStatsByTableID( e.Ctx(), globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema), globalStatsID.tableID, @@ -101,6 +102,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob } return err } + globalStats := globalStatsI.(*globalstats.GlobalStats) // Dump global-level stats to kv. for i := 0; i < globalStats.Num; i++ { hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i] diff --git a/pkg/statistics/BUILD.bazel b/pkg/statistics/BUILD.bazel index d9c949de9369d..d623615bc8a92 100644 --- a/pkg/statistics/BUILD.bazel +++ b/pkg/statistics/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "analyze.go", "analyze_jobs.go", "builder.go", + "builder_ext_stats.go", "cmsketch.go", "cmsketch_util.go", "column.go", diff --git a/pkg/statistics/handle/extstats/extended_stats.go b/pkg/statistics/builder_ext_stats.go similarity index 71% rename from pkg/statistics/handle/extstats/extended_stats.go rename to pkg/statistics/builder_ext_stats.go index 4de8bd9f3f522..049ced4c65116 100644 --- a/pkg/statistics/handle/extstats/extended_stats.go +++ b/pkg/statistics/builder_ext_stats.go @@ -12,38 +12,46 @@ // See the License for the specific language governing permissions and // limitations under the License. -package extstats +package statistics import ( + "context" "encoding/json" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/statistics" - "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util/logutil" +<<<<<<< HEAD:pkg/statistics/handle/extstats/extended_stats.go "github.com/pingcap/tidb/pkg/util/mathutil" +======= + "github.com/pingcap/tidb/pkg/util/sqlexec" +>>>>>>> ac1f0d92a6e (planner: move more methods from StatsHandle to its sub-packages (#47749)):pkg/statistics/builder_ext_stats.go "go.uber.org/zap" ) // BuildExtendedStats build extended stats for column groups if needed based on the column samples. -// TODO: move this function to statistics/builder.go. func BuildExtendedStats(sctx sessionctx.Context, - tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) { + tableID int64, cols []*model.ColumnInfo, collectors []*SampleCollector) (*ExtendedStatsColl, error) { const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)" - rows, _, err := util.ExecRows(sctx, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) + + sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + return nil, errors.Errorf("invalid sql executor") + } + rows, _, err := sqlExec.ExecRestrictedSQL(kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats), nil, sql, tableID, ExtendedStatsAnalyzed, ExtendedStatsInited) if err != nil { return nil, errors.Trace(err) } if len(rows) == 0 { return nil, nil } - statsColl := statistics.NewExtendedStatsColl() + statsColl := NewExtendedStatsColl() for _, row := range rows { name := row.GetString(0) - item := &statistics.ExtendedStatsItem{Tp: uint8(row.GetInt64(1))} + item := &ExtendedStatsItem{Tp: uint8(row.GetInt64(1))} colIDs := row.GetString(2) err := json.Unmarshal([]byte(colIDs), &item.ColIDs) if err != nil { @@ -61,7 +69,7 @@ func BuildExtendedStats(sctx sessionctx.Context, return statsColl, nil } -func fillExtendedStatsItemVals(sctx sessionctx.Context, item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem { +func fillExtendedStatsItemVals(sctx sessionctx.Context, item *ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*SampleCollector) *ExtendedStatsItem { switch item.Tp { case ast.StatsTypeCardinality, ast.StatsTypeDependency: return nil @@ -71,7 +79,7 @@ func fillExtendedStatsItemVals(sctx sessionctx.Context, item *statistics.Extende return nil } -func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem { +func fillExtStatsCorrVals(sctx sessionctx.Context, item *ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*SampleCollector) *ExtendedStatsItem { colOffsets := make([]int, 0, 2) for _, id := range item.ColIDs { for i, col := range cols { @@ -87,8 +95,13 @@ func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStat // samplesX and samplesY are in order of handle, i.e, their SampleItem.Ordinals are in order. samplesX := collectors[colOffsets[0]].Samples // We would modify Ordinal of samplesY, so we make a deep copy. +<<<<<<< HEAD:pkg/statistics/handle/extstats/extended_stats.go samplesY := statistics.CopySampleItems(collectors[colOffsets[1]].Samples) sampleNum := mathutil.Min(len(samplesX), len(samplesY)) +======= + samplesY := CopySampleItems(collectors[colOffsets[1]].Samples) + sampleNum := min(len(samplesX), len(samplesY)) +>>>>>>> ac1f0d92a6e (planner: move more methods from StatsHandle to its sub-packages (#47749)):pkg/statistics/builder_ext_stats.go if sampleNum == 1 { item.ScalarVals = 1 return item @@ -101,11 +114,11 @@ func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStat sc := sctx.GetSessionVars().StmtCtx var err error - samplesX, err = statistics.SortSampleItems(sc, samplesX) + samplesX, err = SortSampleItems(sc, samplesX) if err != nil { return nil } - samplesYInXOrder := make([]*statistics.SampleItem, 0, sampleNum) + samplesYInXOrder := make([]*SampleItem, 0, sampleNum) for i, itemX := range samplesX { if itemX.Ordinal >= len(samplesY) { continue @@ -114,7 +127,7 @@ func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStat itemY.Ordinal = i samplesYInXOrder = append(samplesYInXOrder, itemY) } - samplesYInYOrder, err := statistics.SortSampleItems(sc, samplesYInXOrder) + samplesYInYOrder, err := SortSampleItems(sc, samplesYInXOrder) if err != nil { return nil } diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index ef5e48b0de8f9..d3a99a2e73c51 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "//pkg/infoschema", "//pkg/kv", "//pkg/metrics", - "//pkg/parser/ast", "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/parser/terror", @@ -26,7 +25,6 @@ go_library( "//pkg/statistics", "//pkg/statistics/handle/autoanalyze", "//pkg/statistics/handle/cache", - "//pkg/statistics/handle/extstats", "//pkg/statistics/handle/globalstats", "//pkg/statistics/handle/history", "//pkg/statistics/handle/lockstats", @@ -51,21 +49,19 @@ go_test( timeout = "short", srcs = [ "ddl_test.go", - "gc_test.go", "handle_hist_test.go", "main_test.go", ], embed = [":handle"], flaky = True, race = "on", - shard_count = 13, + shard_count = 8, deps = [ "//pkg/config", "//pkg/parser/model", "//pkg/planner/cardinality", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", - "//pkg/sessionctx/variable", "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/types", diff --git a/pkg/statistics/handle/ddl.go b/pkg/statistics/handle/ddl.go index 89f2d4ace0268..f76bb3bb773a7 100644 --- a/pkg/statistics/handle/ddl.go +++ b/pkg/statistics/handle/ddl.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/parser/model" +<<<<<<< HEAD "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" @@ -29,6 +30,9 @@ import ( statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/sqlexec" +======= + "github.com/pingcap/tidb/pkg/sessionctx/variable" +>>>>>>> ac1f0d92a6e (planner: move more methods from StatsHandle to its sub-packages (#47749)) ) // HandleDDLEvent begins to process a ddl task. @@ -76,7 +80,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error { return err } if variable.PartitionPruneMode(pruneMode) == variable.Dynamic && t.PartInfo != nil { - if err := h.updateGlobalStats(t.TableInfo); err != nil { + if err := h.UpdateGlobalStats(t.TableInfo); err != nil { return err } } @@ -114,6 +118,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error { return nil } +<<<<<<< HEAD // updateStatsVersion will set statistics version to the newest TS, // then tidb-server will reload automatic. func (h *Handle) updateStatsVersion() error { @@ -142,6 +147,8 @@ func (h *Handle) changeGlobalStatsID(from, to int64) (err error) { }, statsutil.FlagWrapTxn) } +======= +>>>>>>> ac1f0d92a6e (planner: move more methods from StatsHandle to its sub-packages (#47749)) func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) { pi := tblInfo.GetPartitionInfo() if pi == nil { diff --git a/pkg/statistics/handle/globalstats/BUILD.bazel b/pkg/statistics/handle/globalstats/BUILD.bazel index 478cd6c2047df..e4ec714ebc3a1 100644 --- a/pkg/statistics/handle/globalstats/BUILD.bazel +++ b/pkg/statistics/handle/globalstats/BUILD.bazel @@ -20,7 +20,6 @@ go_library( "//pkg/statistics", "//pkg/statistics/handle/storage", "//pkg/statistics/handle/util", - "//pkg/table", "//pkg/types", "//pkg/util/hack", "//pkg/util/logutil", diff --git a/pkg/statistics/handle/globalstats/global_stats.go b/pkg/statistics/handle/globalstats/global_stats.go index c1318781008f7..6fc7e94ee8487 100644 --- a/pkg/statistics/handle/globalstats/global_stats.go +++ b/pkg/statistics/handle/globalstats/global_stats.go @@ -23,9 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/tiancaiamao/gp" "go.uber.org/zap" ) @@ -34,6 +32,35 @@ const ( MaxPartitionMergeBatchSize = 256 ) +// statsGlobalImpl implements util.StatsGlobal +type statsGlobalImpl struct { + statsHandler util.StatsHandle +} + +// NewStatsGlobal creates a new StatsGlobal. +func NewStatsGlobal(statsHandler util.StatsHandle) util.StatsGlobal { + return &statsGlobalImpl{statsHandler: statsHandler} +} + +// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. +func (sg *statsGlobalImpl) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + physicalID int64, + isIndex bool, + histIDs []int64, + _ map[int64]*statistics.Table, +) (globalStats interface{}, err error) { + return MergePartitionStats2GlobalStatsByTableID(sc, sg.statsHandler, opts, is, physicalID, isIndex, histIDs) +} + +// UpdateGlobalStats will trigger the merge of global-stats when we drop table partition +func (sg *statsGlobalImpl) UpdateGlobalStats(tblInfo *model.TableInfo) error { + // We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode. + return util.CallWithSCtx(sg.statsHandler.SPool(), func(sctx sessionctx.Context) error { + return UpdateGlobalStats(sctx, sg.statsHandler, tblInfo) + }) +} + // GlobalStats is used to store the statistics contained in the global-level stats // which is generated by the merge of partition-level stats. // It will both store the column stats and index stats. @@ -62,26 +89,17 @@ func newGlobalStats(histCount int) *GlobalStats { return globalStats } -type ( - getTableByPhysicalIDFunc func(is infoschema.InfoSchema, tableID int64) (table.Table, bool) - callWithSCtxFunc func(f func(sctx sessionctx.Context) error, flags ...int) error - saveStatsToStorageFunc func(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) - tableStatsFromStorageFunc func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (*statistics.Table, error) -) - // MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo. func MergePartitionStats2GlobalStats( sc sessionctx.Context, - gpool *gp.Pool, + statsHandle util.StatsHandle, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, globalTableInfo *model.TableInfo, isIndex bool, histIDs []int64, - getTableByPhysicalIDFn getTableByPhysicalIDFunc, - callWithSCtxFunc callWithSCtxFunc, ) (globalStats *GlobalStats, err error) { - worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc) + worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is) if err != nil { return nil, errors.Trace(err) } @@ -95,17 +113,15 @@ func MergePartitionStats2GlobalStats( // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. func MergePartitionStats2GlobalStatsByTableID( sc sessionctx.Context, - gpool *gp.Pool, + statsHandle util.StatsHandle, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, tableID int64, isIndex bool, histIDs []int64, - getTableByPhysicalIDFn getTableByPhysicalIDFunc, - callWithSCtxFunc callWithSCtxFunc, ) (globalStats *GlobalStats, err error) { // Get the partition table IDs. - globalTable, ok := getTableByPhysicalIDFn(is, tableID) + globalTable, ok := statsHandle.TableInfoByID(is, tableID) if !ok { err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", tableID) return @@ -113,7 +129,7 @@ func MergePartitionStats2GlobalStatsByTableID( globalTableInfo := globalTable.Meta() - worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc) + worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is) if err != nil { return nil, errors.Trace(err) } @@ -150,15 +166,11 @@ var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{ // UpdateGlobalStats update the global-level stats based on the partition-level stats. func UpdateGlobalStats( sctx sessionctx.Context, - tblInfo *model.TableInfo, - gpool *gp.Pool, - tableStatsFromStorage tableStatsFromStorageFunc, - getTableByPhysicalIDFn getTableByPhysicalIDFunc, - callWithSCtxFunc callWithSCtxFunc, - saveStatsToStorage saveStatsToStorageFunc) error { + statsHandle util.StatsHandle, + tblInfo *model.TableInfo) error { tableID := tblInfo.ID is := sessiontxn.GetTxnManager(sctx).GetTxnInfoSchema() - globalStats, err := tableStatsFromStorage(tblInfo, tableID, true, 0) + globalStats, err := statsHandle.TableStatsFromStorage(tblInfo, tableID, true, 0) if err != nil { return err } @@ -189,7 +201,7 @@ func UpdateGlobalStats( opts[ast.AnalyzeOptNumBuckets] = uint64(globalColStatsBucketNum) } // Generate the new column global-stats - newColGlobalStats, err := MergePartitionStats2GlobalStats(sctx, gpool, opts, is, tblInfo, false, nil, getTableByPhysicalIDFn, callWithSCtxFunc) + newColGlobalStats, err := MergePartitionStats2GlobalStats(sctx, statsHandle, opts, is, tblInfo, false, nil) if err != nil { return err } @@ -204,7 +216,7 @@ func UpdateGlobalStats( continue } // fms for global stats doesn't need to dump to kv. - err = saveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount, + err = statsHandle.SaveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount, 0, hg, cms, topN, 2, 1, false, util.StatsMetaHistorySourceSchemaChange) if err != nil { return err @@ -228,7 +240,7 @@ func UpdateGlobalStats( if globalIdxStatsBucketNum != 0 { opts[ast.AnalyzeOptNumBuckets] = uint64(globalIdxStatsBucketNum) } - newIndexGlobalStats, err := MergePartitionStats2GlobalStats(sctx, gpool, opts, is, tblInfo, true, []int64{idx.ID}, getTableByPhysicalIDFn, callWithSCtxFunc) + newIndexGlobalStats, err := MergePartitionStats2GlobalStats(sctx, statsHandle, opts, is, tblInfo, true, []int64{idx.ID}) if err != nil { return err } @@ -243,7 +255,7 @@ func UpdateGlobalStats( continue } // fms for global stats doesn't need to dump to kv. - err = saveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false, util.StatsMetaHistorySourceSchemaChange) + err = statsHandle.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false, util.StatsMetaHistorySourceSchemaChange) if err != nil { return err } diff --git a/pkg/statistics/handle/globalstats/global_stats_async.go b/pkg/statistics/handle/globalstats/global_stats_async.go index 257bb1135e193..f3c3fca589c3f 100644 --- a/pkg/statistics/handle/globalstats/global_stats_async.go +++ b/pkg/statistics/handle/globalstats/global_stats_async.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" - "github.com/tiancaiamao/gp" "golang.org/x/sync/errgroup" ) @@ -71,19 +70,16 @@ func toSQLIndex(isIndex bool) int { // └────────────────────────┘ └───────────────────────┘ type AsyncMergePartitionStats2GlobalStats struct { is infoschema.InfoSchema + statsHandle util.StatsHandle globalStats *GlobalStats - pool util.SessionPool cmsketch chan mergeItem[*statistics.CMSketch] fmsketch chan mergeItem[*statistics.FMSketch] histogramAndTopn chan mergeItem[*StatsWrapper] - gpool *gp.Pool allPartitionStats map[int64]*statistics.Table PartitionDefinition map[int64]model.PartitionDefinition tableInfo map[int64]*model.TableInfo // key is partition id and histID skipPartition map[skipItem]struct{} - getTableByPhysicalIDFn getTableByPhysicalIDFunc - callWithSCtxFunc callWithSCtxFunc exitWhenErrChan chan struct{} globalTableInfo *model.TableInfo histIDs []int64 @@ -95,30 +91,26 @@ type AsyncMergePartitionStats2GlobalStats struct { // NewAsyncMergePartitionStats2GlobalStats creates a new AsyncMergePartitionStats2GlobalStats. func NewAsyncMergePartitionStats2GlobalStats( - gpool *gp.Pool, + statsHandle util.StatsHandle, globalTableInfo *model.TableInfo, histIDs []int64, - is infoschema.InfoSchema, - getTableByPhysicalIDFn getTableByPhysicalIDFunc, - callWithSCtxFunc callWithSCtxFunc) (*AsyncMergePartitionStats2GlobalStats, error) { + is infoschema.InfoSchema) (*AsyncMergePartitionStats2GlobalStats, error) { partitionNum := len(globalTableInfo.Partition.Definitions) return &AsyncMergePartitionStats2GlobalStats{ - callWithSCtxFunc: callWithSCtxFunc, - cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), - fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), - histogramAndTopn: make(chan mergeItem[*StatsWrapper], 5), - PartitionDefinition: make(map[int64]model.PartitionDefinition), - tableInfo: make(map[int64]*model.TableInfo), - partitionIDs: make([]int64, 0, partitionNum), - exitWhenErrChan: make(chan struct{}), - skipPartition: make(map[skipItem]struct{}), - gpool: gpool, - allPartitionStats: make(map[int64]*statistics.Table), - globalTableInfo: globalTableInfo, - getTableByPhysicalIDFn: getTableByPhysicalIDFn, - histIDs: histIDs, - is: is, - partitionNum: partitionNum, + statsHandle: statsHandle, + cmsketch: make(chan mergeItem[*statistics.CMSketch], 5), + fmsketch: make(chan mergeItem[*statistics.FMSketch], 5), + histogramAndTopn: make(chan mergeItem[*StatsWrapper], 5), + PartitionDefinition: make(map[int64]model.PartitionDefinition), + tableInfo: make(map[int64]*model.TableInfo), + partitionIDs: make([]int64, 0, partitionNum), + exitWhenErrChan: make(chan struct{}), + skipPartition: make(map[skipItem]struct{}), + allPartitionStats: make(map[int64]*statistics.Table), + globalTableInfo: globalTableInfo, + histIDs: histIDs, + is: is, + partitionNum: partitionNum, }, nil } @@ -140,7 +132,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) prepare(sctx sessionctx.Context, partitionID := def.ID a.partitionIDs = append(a.partitionIDs, partitionID) a.PartitionDefinition[partitionID] = def - partitionTable, ok := a.getTableByPhysicalIDFn(a.is, partitionID) + partitionTable, ok := a.statsHandle.TableInfoByID(a.is, partitionID) if !ok { return errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID) } @@ -299,7 +291,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) MergePartitionStats2GlobalStats( tz := sctx.GetSessionVars().StmtCtx.TimeZone() analyzeVersion := sctx.GetSessionVars().AnalyzeVersion stmtCtx := sctx.GetSessionVars().StmtCtx - return a.callWithSCtxFunc( + return util.CallWithSCtx(a.statsHandle.SPool(), func(sctx sessionctx.Context) error { err := a.prepare(sctx, isIndex) if err != nil { @@ -468,7 +460,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) dealHistogramAndTopN(stmtCtx *stm var poppedTopN []statistics.TopNMeta var allhg []*statistics.Histogram wrapper := item.item - a.globalStats.TopN[item.idx], poppedTopN, allhg, err = mergeGlobalStatsTopN(a.gpool, sctx, wrapper, + a.globalStats.TopN[item.idx], poppedTopN, allhg, err = mergeGlobalStatsTopN(a.statsHandle.GPool(), sctx, wrapper, tz, analyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex) if err != nil { return err diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index f25df3977326e..39d626e7ccee9 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -20,15 +20,12 @@ import ( "github.com/pingcap/tidb/pkg/config" ddlUtil "github.com/pingcap/tidb/pkg/ddl/util" - "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze" "github.com/pingcap/tidb/pkg/statistics/handle/cache" - "github.com/pingcap/tidb/pkg/statistics/handle/extstats" "github.com/pingcap/tidb/pkg/statistics/handle/globalstats" "github.com/pingcap/tidb/pkg/statistics/handle/history" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" @@ -79,6 +76,9 @@ type Handle struct { // StatsLock is used to manage locked stats. util.StatsLock + // StatsGlobal is used to manage global stats. + util.StatsGlobal + // This gpool is used to reuse goroutine in the mergeGlobalStatsTopN. gpool *gp.Pool @@ -135,6 +135,7 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti handle.StatsHistory = history.NewStatsHistory(handle) handle.StatsUsage = usage.NewStatsUsageImpl(handle) handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(handle) + handle.StatsGlobal = globalstats.NewStatsGlobal(handle) handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) @@ -152,34 +153,6 @@ func (h *Handle) SetLease(lease time.Duration) { h.lease.Store(lease) } -// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. -func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, - opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, - physicalID int64, - isIndex bool, - histIDs []int64, - _ map[int64]*statistics.Table, -) (globalStats *globalstats.GlobalStats, err error) { - return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, h.TableInfoByID, h.callWithSCtx) -} - -// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableInfo. -func (h *Handle) mergePartitionStats2GlobalStats( - opts map[ast.AnalyzeOptionType]uint64, - is infoschema.InfoSchema, - globalTableInfo *model.TableInfo, - isIndex bool, - histIDs []int64, - _ map[int64]*statistics.Table, -) (gstats *globalstats.GlobalStats, err error) { - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - gstats, err = globalstats.MergePartitionStats2GlobalStats(sctx, h.gpool, opts, is, globalTableInfo, isIndex, - histIDs, h.TableInfoByID, h.callWithSCtx) - return err - }) - return -} - // GetTableStats retrieves the statistics table from cache, and the cache will be updated by a goroutine. // TODO: remove GetTableStats later on. func (h *Handle) GetTableStats(tblInfo *model.TableInfo) *statistics.Table { @@ -220,6 +193,7 @@ func (h *Handle) FlushStats() { } } +<<<<<<< HEAD // SaveTableStatsToStorage saves the stats of a table to storage. func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) { return h.callWithSCtx(func(sctx sessionctx.Context) error { @@ -252,19 +226,17 @@ func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, col return es, err } +======= +>>>>>>> ac1f0d92a6e (planner: move more methods from StatsHandle to its sub-packages (#47749)) // Close stops the background func (h *Handle) Close() { h.gpool.Close() h.StatsCache.Close() } -func (h *Handle) callWithSCtx(f func(sctx sessionctx.Context) error, flags ...int) (err error) { - return util.CallWithSCtx(h.pool, f, flags...) -} - // GetCurrentPruneMode returns the current latest partitioning table prune mode. func (h *Handle) GetCurrentPruneMode() (mode string, err error) { - err = h.callWithSCtx(func(sctx sessionctx.Context) error { + err = util.CallWithSCtx(h.pool, func(sctx sessionctx.Context) error { mode = sctx.GetSessionVars().PartitionPruneMode.Load() return nil }) diff --git a/pkg/statistics/handle/storage/BUILD.bazel b/pkg/statistics/handle/storage/BUILD.bazel index bf1d7590510e6..dd972b020dee2 100644 --- a/pkg/statistics/handle/storage/BUILD.bazel +++ b/pkg/statistics/handle/storage/BUILD.bazel @@ -48,15 +48,17 @@ go_test( timeout = "short", srcs = [ "dump_test.go", + "gc_test.go", "read_test.go", ], flaky = True, - shard_count = 16, + shard_count = 21, deps = [ ":storage", "//pkg/domain", "//pkg/parser/model", "//pkg/planner/cardinality", + "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/statistics/handle/internal", "//pkg/statistics/handle/util", diff --git a/pkg/statistics/handle/gc_test.go b/pkg/statistics/handle/storage/gc_test.go similarity index 99% rename from pkg/statistics/handle/gc_test.go rename to pkg/statistics/handle/storage/gc_test.go index 5fefeef6b1f02..fc54a52b2ddc8 100644 --- a/pkg/statistics/handle/gc_test.go +++ b/pkg/statistics/handle/storage/gc_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 PingCAP, Inc. +// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package handle_test +package storage_test import ( "testing" diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index b2b374309aac5..f4816bc00aeba 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -19,6 +19,7 @@ import ( "time" "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" @@ -282,6 +283,21 @@ type StatsReadWriter interface { SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) } +// StatsGlobal is used to manage partition table global stats. +type StatsGlobal interface { + // MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID. + MergePartitionStats2GlobalStatsByTableID(sctx sessionctx.Context, + opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, + physicalID int64, + isIndex bool, + histIDs []int64, + _ map[int64]*statistics.Table, + ) (globalStats interface{}, err error) + + // UpdateGlobalStats will trigger the merge of global-stats when we drop table partition + UpdateGlobalStats(tblInfo *model.TableInfo) error +} + // StatsHandle is used to manage TiDB Statistics. type StatsHandle interface { // GPool returns the goroutine pool. @@ -331,4 +347,7 @@ type StatsHandle interface { // StatsReadWriter is used to read and write stats to the storage. StatsReadWriter + + // StatsGlobal is used to manage partition table global stats. + StatsGlobal }