Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Feb 22, 2021
1 parent 2ac04fd commit 612871d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 155 deletions.
33 changes: 19 additions & 14 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,6 @@ func drainRecordSet(ctx context.Context, se *session, rs sqlexec.RecordSet) ([]c
}
}

// getTableValue executes restricted sql and the result is one column.
// It returns a string value.
func (s *session) getTableValue(ctx context.Context, tblName string, varName string) (string, error) {
stmt, err := s.ParseWithParams(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?", mysql.SystemDB, tblName, varName)
if err != nil {
Expand Down Expand Up @@ -2073,6 +2071,8 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
}
}

initLoadCommonGlobalVarsSQL()

ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand All @@ -2084,7 +2084,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
if err != nil {
return nil, err
}

// get system tz from mysql.tidb
tz, err := se.getTableValue(context.TODO(), mysql.TiDBTable, "system_tz")
if err != nil {
Expand Down Expand Up @@ -2447,8 +2446,24 @@ var builtinGlobalVariable = []string{
variable.TiDBEnableExchangePartition,
}

var (
loadCommonGlobalVarsSQLOnce sync.Once
loadCommonGlobalVarsSQL string
)

func initLoadCommonGlobalVarsSQL() {
loadCommonGlobalVarsSQLOnce.Do(func() {
vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...)
if len(variable.PluginVarNames) > 0 {
vars = append(vars, variable.PluginVarNames...)
}
loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variables where variable_name in ('" + strings.Join(vars, quoteCommaQuote) + "')"
})
}

// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
initLoadCommonGlobalVarsSQL()
vars := s.sessionVars
if vars.CommonGlobalLoaded {
return nil
Expand All @@ -2463,17 +2478,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error {
// When a lot of connections connect to TiDB simultaneously, it can protect TiKV meta region from overload.
gvc := domain.GetDomain(s).GetGlobalVarsCache()
loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) {
vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...)
if len(variable.PluginVarNames) > 0 {
vars = append(vars, variable.PluginVarNames...)
}

stmt, err := s.ParseWithParams(context.TODO(), "select HIGH_PRIORITY * from mysql.global_variables where variable_name in (%?)", vars)
if err != nil {
return nil, nil, errors.Trace(err)
}

return s.ExecRestrictedStmt(context.TODO(), stmt)
return s.ExecRestrictedSQL(loadCommonGlobalVarsSQL)
}
rows, fields, err := gvc.LoadGlobalVariables(loadFunc)
if err != nil {
Expand Down
141 changes: 0 additions & 141 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,147 +220,6 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
return nil
}

<<<<<<< HEAD
=======
// UpdateSessionVar updates the necessary session variables for the stats reader.
func (h *Handle) UpdateSessionVar() error {
h.mu.Lock()
defer h.mu.Unlock()
verInString, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion)
if err != nil {
return err
}
ver, err := strconv.ParseInt(verInString, 10, 64)
if err != nil {
return err
}
h.mu.ctx.GetSessionVars().AnalyzeVersion = int(ver)
return err
}

// GlobalStats is used to store the statistics contained in the global-level stats
// which is generated by the merge of partition-level stats.
// It will both store the column stats and index stats.
// In the column statistics, the variable `num` is equal to the number of columns in the partition table.
// In the index statistics, the variable `num` is always equal to one.
type GlobalStats struct {
Num int
Count int64
Hg []*statistics.Histogram
Cms []*statistics.CMSketch
TopN []*statistics.TopN
}

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableID.
func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, is infoschema.InfoSchema, physicalID int64, isIndex int, idxID int64) (globalStats *GlobalStats, err error) {
// get the partition table IDs
h.mu.Lock()
globalTable, ok := h.getTableByPhysicalID(is, physicalID)
h.mu.Unlock()
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID)
return
}
globalTableInfo := globalTable.Meta()
partitionNum := globalTableInfo.Partition.Num
partitionIDs := make([]int64, 0, partitionNum)
for i := uint64(0); i < partitionNum; i++ {
partitionIDs = append(partitionIDs, globalTableInfo.Partition.Definitions[i].ID)
}

// initialized the globalStats
globalStats = new(GlobalStats)
if isIndex == 0 {
globalStats.Num = len(globalTableInfo.Columns)
} else {
globalStats.Num = 1
}
globalStats.Count = 0
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)

// The first dimension of slice is means the number of column or index stats in the globalStats.
// The second dimension of slice is means the number of partition tables.
// Because all topN and histograms need to be collected before they can be merged.
// So we should store all of the partition-level stats first, and merge them together.
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
for i := 0; i < globalStats.Num; i++ {
allHg[i] = make([]*statistics.Histogram, 0, partitionNum)
allCms[i] = make([]*statistics.CMSketch, 0, partitionNum)
allTopN[i] = make([]*statistics.TopN, 0, partitionNum)
}

for _, partitionID := range partitionIDs {
h.mu.Lock()
partitionTable, ok := h.getTableByPhysicalID(is, partitionID)
h.mu.Unlock()
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID)
return
}
tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
partitionStats, err = h.TableStatsFromStorage(tableInfo, partitionID, false, 0)
if err != nil {
return
}
if partitionStats == nil {
err = errors.Errorf("[stats] error occurred when read partition-level stats of the table with tableID %d and partitionID %d", physicalID, partitionID)
return
}
globalStats.Count += partitionStats.Count
for i := 0; i < globalStats.Num; i++ {
ID := tableInfo.Columns[i].ID
if isIndex != 0 {
// If the statistics is the index stats, we should use the index ID to replace the column ID.
ID = idxID
}
hg, cms, topN := partitionStats.GetStatsInfo(ID, isIndex == 1)
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
allTopN[i] = append(allTopN[i], topN)
}
}

// After collect all of the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
// Merge CMSketch
globalStats.Cms[i] = allCms[i][0].Copy()
for j := uint64(1); j < partitionNum; j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
if err != nil {
return
}
}

// Merge topN. We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These left numbers should be inserted into the histogram.
err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet")
if err != nil {
return
}

// Merge histogram
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc, allHg[i], 0)
if err != nil {
return
}

// Merge NDV
err = errors.Errorf("TODO: The merge function of the NDV has not been implemented yet")
if err != nil {
return
}
}
return
}

>>>>>>> 99d0b22f0... session, util: update session to use new APIs (#22652)
func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
if is.SchemaMetaVersion() != h.mu.schemaVersion {
h.mu.schemaVersion = is.SchemaMetaVersion()
Expand Down

0 comments on commit 612871d

Please sign in to comment.