Skip to content

Commit

Permalink
Merge branch 'master' into add-index-delete-only
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Dec 9, 2022
2 parents de6f4ca + ef6fb2e commit 65b3c65
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 28 deletions.
27 changes: 11 additions & 16 deletions server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"fmt"
"net/http"
"strconv"
"time"
Expand All @@ -24,8 +25,8 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/gcutil"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -105,14 +106,14 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}
defer se.Close()

dumpPartitionStats := true
if len(params[pDumpPartitionStats]) > 0 {
dumpPartitionStats, err = strconv.ParseBool(params[pDumpPartitionStats])
if err != nil {
writeError(w, err)
return
}
enabeld, err := sh.do.StatsHandle().CheckHistoricalStatsEnable()
if err != nil {
writeError(w, err)
return
}
if !enabeld {
writeError(w, fmt.Errorf("%v should be enabled", variable.TiDBEnableHistoricalStats))
return
}

se.GetSessionVars().StmtCtx.TimeZone = time.Local
Expand All @@ -127,12 +128,6 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
return
}
snapshot := oracle.GoTimeToTS(t1)
err = gcutil.ValidateSnapshot(se, snapshot)
if err != nil {
writeError(w, err)
return
}

is, err := sh.do.GetSnapshotInfoSchema(snapshot)
if err != nil {
writeError(w, err)
Expand All @@ -144,7 +139,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
writeError(w, err)
return
}
js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot, dumpPartitionStats)
js, err := h.DumpHistoricalStatsBySnapshot(params[pDBName], tbl.Meta(), snapshot)
if err != nil {
writeError(w, err)
} else {
Expand Down
6 changes: 6 additions & 0 deletions server/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -59,6 +60,10 @@ func TestDumpStatsAPI(t *testing.T) {
statsHandler := &StatsHandler{dom}

prepareData(t, client, statsHandler)
tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test"))
require.NoError(t, err)
err = dom.GetHistoricalStatsWorker().DumpHistoricalStats(tableInfo.Meta().ID, dom.StatsHandle())
require.NoError(t, err)

router := mux.NewRouter()
router.Handle("/stats/dump/{db}/{table}", statsHandler)
Expand Down Expand Up @@ -168,6 +173,7 @@ func prepareData(t *testing.T, client *testServerClient, statHandle *StatsHandle
tk.MustExec("insert test values (1, 's')")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table test")
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("insert into test(a,b) values (1, 'v'),(3, 'vvv'),(5, 'vv')")
is := statHandle.do.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
h.recordHistoricalStatsMeta(physicalID, statsVer)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -263,7 +263,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
h.recordHistoricalStatsMeta(physicalID, statsVer)
}
}()
h.mu.Lock()
Expand Down
93 changes: 93 additions & 0 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"time"

Expand Down Expand Up @@ -130,6 +131,42 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo,
return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot, dumpPartitionStats)
}

// DumpHistoricalStatsBySnapshot dumped json tables from mysql.stats_meta_history and mysql.stats_history
func (h *Handle) DumpHistoricalStatsBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) {
pi := tableInfo.GetPartitionInfo()
if pi == nil {
return h.tableHistoricalStatsToJSON(tableInfo.ID, snapshot)
}
jsonTbl := &JSONTable{
DatabaseName: dbName,
TableName: tableInfo.Name.L,
Partitions: make(map[string]*JSONTable, len(pi.Definitions)),
}
for _, def := range pi.Definitions {
tbl, err := h.tableHistoricalStatsToJSON(def.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
if tbl == nil {
continue
}
jsonTbl.Partitions[def.Name.L] = tbl
}
h.mu.Lock()
isDynamicMode := variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load()) == variable.Dynamic
h.mu.Unlock()
if isDynamicMode {
tbl, err := h.tableHistoricalStatsToJSON(tableInfo.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
if tbl != nil {
jsonTbl.Partitions["global"] = tbl
}
}
return jsonTbl, nil
}

// DumpStatsToJSONBySnapshot dumps statistic to json.
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*JSONTable, error) {
h.mu.Lock()
Expand Down Expand Up @@ -194,6 +231,62 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati
return jsonTbl, nil
}

func (h *Handle) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (*JSONTable, error) {
reader, err := h.getGlobalStatsReader(0)
if err != nil {
return nil, err
}
defer func() {
err1 := h.releaseGlobalStatsReader(reader)
if err == nil && err1 != nil {
err = err1
}
}()

// get meta version
rows, _, err := reader.read("select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
return nil, errors.AddStack(err)
}
if len(rows) < 1 {
return nil, fmt.Errorf("failed to get records of stats_meta_history for table_id = %v, snapshot = %v", physicalID, snapshot)
}
statsMetaVersion := rows[0].GetInt64(0)
// get stats meta
rows, _, err = reader.read("select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion)
if err != nil {
return nil, errors.AddStack(err)
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

// get stats version
rows, _, err = reader.read("select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
return nil, errors.AddStack(err)
}
if len(rows) < 1 {
return nil, fmt.Errorf("failed to get record of stats_history for table_id = %v, snapshot = %v", physicalID, snapshot)
}
statsVersion := rows[0].GetInt64(0)

// get stats
rows, _, err = reader.read("select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion)
if err != nil {
return nil, errors.AddStack(err)
}
blocks := make([][]byte, 0)
for _, row := range rows {
blocks = append(blocks, row.GetBytes(0))
}
jsonTbl, err := BlocksToJSONTable(blocks)
if err != nil {
return nil, errors.AddStack(err)
}
jsonTbl.Count = count
jsonTbl.ModifyCount = modifyCount
return jsonTbl, nil
}

func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) {
tbl, err := h.TableStatsFromStorage(tableInfo, physicalID, true, snapshot)
if err != nil || tbl == nil {
Expand Down
29 changes: 20 additions & 9 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, ana
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
Expand All @@ -1634,7 +1634,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = recordHistoricalStatsMeta(sctx, tableID, statsVer)
if err1 := recordHistoricalStatsMeta(sctx, tableID, statsVer); err1 != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.Error(err1))
}
}
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down Expand Up @@ -1808,7 +1813,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isI
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -1887,7 +1892,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -2093,7 +2098,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
slices.Sort(colIDs)
Expand Down Expand Up @@ -2164,7 +2169,7 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down Expand Up @@ -2377,7 +2382,7 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
if extStats == nil || len(extStats.Stats) == 0 {
Expand Down Expand Up @@ -2676,10 +2681,16 @@ func recordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version u
return nil
}

func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error {
func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) {
h.mu.Lock()
defer h.mu.Unlock()
return recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
err := recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
if err != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.Error(err))
}
}

// InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job.
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(id, statsVer)
h.recordHistoricalStatsMeta(id, statsVer)
}
}()
if delta.Count == 0 {
Expand Down

0 comments on commit 65b3c65

Please sign in to comment.