Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move more methods from StatsHandle to its sub-packages #47749

Merged
merged 8 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/globalstats",
"//pkg/statistics/handle/util",
"//pkg/store/driver/backoff",
"//pkg/store/driver/txn",
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
fms = append(fms, collectors[i].FMSketch)
}
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, collectors)
extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, collectors)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats(

count = rootRowCollector.Base().Count
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
extStats, err = statistics.BuildExtendedStats(e.ctx, e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
if err != nil {
return 0, nil, nil, nil, nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
cache = nil
}

globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
globalStatsI, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(
e.Ctx(),
globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID,
Expand All @@ -101,6 +102,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
}
return err
}
globalStats := globalStatsI.(*globalstats.GlobalStats)
// Dump global-level stats to kv.
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"analyze.go",
"analyze_jobs.go",
"builder.go",
"builder_ext_stats.go",
"cmsketch.go",
"cmsketch_util.go",
"column.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package extstats
package statistics

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
// TODO: move this function to statistics/builder.go.
func BuildExtendedStats(sctx sessionctx.Context,
tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) {
tableID int64, cols []*model.ColumnInfo, collectors []*SampleCollector) (*ExtendedStatsColl, error) {
const sql = "SELECT name, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)"
rows, _, err := util.ExecRows(sctx, sql, tableID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited)

sqlExec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, errors.Errorf("invalid sql executor")
}
rows, _, err := sqlExec.ExecRestrictedSQL(context.Background(), nil, sql, tableID, ExtendedStatsAnalyzed, ExtendedStatsInited)
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, nil
}
statsColl := statistics.NewExtendedStatsColl()
statsColl := NewExtendedStatsColl()
for _, row := range rows {
name := row.GetString(0)
item := &statistics.ExtendedStatsItem{Tp: uint8(row.GetInt64(1))}
item := &ExtendedStatsItem{Tp: uint8(row.GetInt64(1))}
colIDs := row.GetString(2)
err := json.Unmarshal([]byte(colIDs), &item.ColIDs)
if err != nil {
Expand All @@ -60,7 +64,7 @@ func BuildExtendedStats(sctx sessionctx.Context,
return statsColl, nil
}

func fillExtendedStatsItemVals(sctx sessionctx.Context, item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem {
func fillExtendedStatsItemVals(sctx sessionctx.Context, item *ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*SampleCollector) *ExtendedStatsItem {
switch item.Tp {
case ast.StatsTypeCardinality, ast.StatsTypeDependency:
return nil
Expand All @@ -70,7 +74,7 @@ func fillExtendedStatsItemVals(sctx sessionctx.Context, item *statistics.Extende
return nil
}

func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem {
func fillExtStatsCorrVals(sctx sessionctx.Context, item *ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*SampleCollector) *ExtendedStatsItem {
colOffsets := make([]int, 0, 2)
for _, id := range item.ColIDs {
for i, col := range cols {
Expand All @@ -86,7 +90,7 @@ func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStat
// samplesX and samplesY are in order of handle, i.e, their SampleItem.Ordinals are in order.
samplesX := collectors[colOffsets[0]].Samples
// We would modify Ordinal of samplesY, so we make a deep copy.
samplesY := statistics.CopySampleItems(collectors[colOffsets[1]].Samples)
samplesY := CopySampleItems(collectors[colOffsets[1]].Samples)
sampleNum := min(len(samplesX), len(samplesY))
if sampleNum == 1 {
item.ScalarVals = 1
Expand All @@ -100,11 +104,11 @@ func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStat
sc := sctx.GetSessionVars().StmtCtx

var err error
samplesX, err = statistics.SortSampleItems(sc, samplesX)
samplesX, err = SortSampleItems(sc, samplesX)
if err != nil {
return nil
}
samplesYInXOrder := make([]*statistics.SampleItem, 0, sampleNum)
samplesYInXOrder := make([]*SampleItem, 0, sampleNum)
for i, itemX := range samplesX {
if itemX.Ordinal >= len(samplesY) {
continue
Expand All @@ -113,7 +117,7 @@ func fillExtStatsCorrVals(sctx sessionctx.Context, item *statistics.ExtendedStat
itemY.Ordinal = i
samplesYInXOrder = append(samplesYInXOrder, itemY)
}
samplesYInYOrder, err := statistics.SortSampleItems(sc, samplesYInXOrder)
samplesYInYOrder, err := SortSampleItems(sc, samplesYInXOrder)
if err != nil {
return nil
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/infoschema",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
Expand All @@ -26,7 +25,6 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/extstats",
"//pkg/statistics/handle/globalstats",
"//pkg/statistics/handle/history",
"//pkg/statistics/handle/lockstats",
Expand Down
12 changes: 1 addition & 11 deletions pkg/statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ package handle
import (
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
)

// HandleDDLEvent begins to process a ddl task.
Expand Down Expand Up @@ -67,7 +65,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
if variable.PartitionPruneMode(pruneMode) == variable.Dynamic && t.PartInfo != nil {
if err := h.updateGlobalStats(t.TableInfo); err != nil {
if err := h.UpdateGlobalStats(t.TableInfo); err != nil {
return err
}
}
Expand Down Expand Up @@ -105,14 +103,6 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return nil
}

// updateGlobalStats will trigger the merge of global-stats when we drop table partition
func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
// We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode.
return h.callWithSCtx(func(sctx sessionctx.Context) error {
return globalstats.UpdateGlobalStats(sctx, tblInfo, h.gpool, h.TableStatsFromStorage, h.TableInfoByID, h.callWithSCtx, h.SaveStatsToStorage)
})
}

func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
Expand Down
18 changes: 0 additions & 18 deletions pkg/statistics/handle/extstats/BUILD.bazel

This file was deleted.

1 change: 0 additions & 1 deletion pkg/statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"//pkg/statistics",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/util",
"//pkg/table",
"//pkg/types",
"//pkg/util/hack",
"//pkg/util/logutil",
Expand Down
70 changes: 41 additions & 29 deletions pkg/statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
)

Expand All @@ -34,6 +32,35 @@ const (
MaxPartitionMergeBatchSize = 256
)

// statsGlobalImpl implements util.StatsGlobal
type statsGlobalImpl struct {
statsHandler util.StatsHandle
}

// NewStatsGlobal creates a new StatsGlobal.
func NewStatsGlobal(statsHandler util.StatsHandle) util.StatsGlobal {
return &statsGlobalImpl{statsHandler: statsHandler}
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (sg *statsGlobalImpl) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
physicalID int64,
isIndex bool,
histIDs []int64,
_ map[int64]*statistics.Table,
) (globalStats interface{}, err error) {
return MergePartitionStats2GlobalStatsByTableID(sc, sg.statsHandler, opts, is, physicalID, isIndex, histIDs)
}

// UpdateGlobalStats will trigger the merge of global-stats when we drop table partition
func (sg *statsGlobalImpl) UpdateGlobalStats(tblInfo *model.TableInfo) error {
// We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode.
return util.CallWithSCtx(sg.statsHandler.SPool(), func(sctx sessionctx.Context) error {
return UpdateGlobalStats(sctx, sg.statsHandler, tblInfo)
})
}

// GlobalStats is used to store the statistics contained in the global-level stats
// which is generated by the merge of partition-level stats.
// It will both store the column stats and index stats.
Expand Down Expand Up @@ -62,26 +89,17 @@ func newGlobalStats(histCount int) *GlobalStats {
return globalStats
}

type (
getTableByPhysicalIDFunc func(is infoschema.InfoSchema, tableID int64) (table.Table, bool)
callWithSCtxFunc func(f func(sctx sessionctx.Context) error, flags ...int) error
saveStatsToStorageFunc func(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error)
tableStatsFromStorageFunc func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (*statistics.Table, error)
)

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo.
func MergePartitionStats2GlobalStats(
sc sessionctx.Context,
gpool *gp.Pool,
statsHandle util.StatsHandle,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
callWithSCtxFunc callWithSCtxFunc,
) (globalStats *GlobalStats, err error) {
worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc)
worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -95,25 +113,23 @@ func MergePartitionStats2GlobalStats(
// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func MergePartitionStats2GlobalStatsByTableID(
sc sessionctx.Context,
gpool *gp.Pool,
statsHandle util.StatsHandle,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
tableID int64,
isIndex bool,
histIDs []int64,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
callWithSCtxFunc callWithSCtxFunc,
) (globalStats *GlobalStats, err error) {
// Get the partition table IDs.
globalTable, ok := getTableByPhysicalIDFn(is, tableID)
globalTable, ok := statsHandle.TableInfoByID(is, tableID)
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", tableID)
return
}

globalTableInfo := globalTable.Meta()

worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc)
worker, err := NewAsyncMergePartitionStats2GlobalStats(statsHandle, globalTableInfo, histIDs, is)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -150,15 +166,11 @@ var analyzeOptionDefault = map[ast.AnalyzeOptionType]uint64{
// UpdateGlobalStats update the global-level stats based on the partition-level stats.
func UpdateGlobalStats(
sctx sessionctx.Context,
tblInfo *model.TableInfo,
gpool *gp.Pool,
tableStatsFromStorage tableStatsFromStorageFunc,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
callWithSCtxFunc callWithSCtxFunc,
saveStatsToStorage saveStatsToStorageFunc) error {
statsHandle util.StatsHandle,
tblInfo *model.TableInfo) error {
tableID := tblInfo.ID
is := sessiontxn.GetTxnManager(sctx).GetTxnInfoSchema()
globalStats, err := tableStatsFromStorage(tblInfo, tableID, true, 0)
globalStats, err := statsHandle.TableStatsFromStorage(tblInfo, tableID, true, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +201,7 @@ func UpdateGlobalStats(
opts[ast.AnalyzeOptNumBuckets] = uint64(globalColStatsBucketNum)
}
// Generate the new column global-stats
newColGlobalStats, err := MergePartitionStats2GlobalStats(sctx, gpool, opts, is, tblInfo, false, nil, getTableByPhysicalIDFn, callWithSCtxFunc)
newColGlobalStats, err := MergePartitionStats2GlobalStats(sctx, statsHandle, opts, is, tblInfo, false, nil)
if err != nil {
return err
}
Expand All @@ -204,7 +216,7 @@ func UpdateGlobalStats(
continue
}
// fms for global stats doesn't need to dump to kv.
err = saveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount,
err = statsHandle.SaveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount,
0, hg, cms, topN, 2, 1, false, util.StatsMetaHistorySourceSchemaChange)
if err != nil {
return err
Expand All @@ -228,7 +240,7 @@ func UpdateGlobalStats(
if globalIdxStatsBucketNum != 0 {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalIdxStatsBucketNum)
}
newIndexGlobalStats, err := MergePartitionStats2GlobalStats(sctx, gpool, opts, is, tblInfo, true, []int64{idx.ID}, getTableByPhysicalIDFn, callWithSCtxFunc)
newIndexGlobalStats, err := MergePartitionStats2GlobalStats(sctx, statsHandle, opts, is, tblInfo, true, []int64{idx.ID})
if err != nil {
return err
}
Expand All @@ -243,7 +255,7 @@ func UpdateGlobalStats(
continue
}
// fms for global stats doesn't need to dump to kv.
err = saveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false, util.StatsMetaHistorySourceSchemaChange)
err = statsHandle.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false, util.StatsMetaHistorySourceSchemaChange)
if err != nil {
return err
}
Expand Down
Loading