Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, statistics: analyze use MaxUint64 ts to read data #35232

Merged
merged 13 commits into from
Jul 20, 2022
7 changes: 6 additions & 1 deletion executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -107,11 +108,15 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
startTS := uint64(math.MaxUint64)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if e.ctx.GetSessionVars().EnableAnalyzeSnapshot {
startTS = e.snapshot
}
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := reqBuilder.
SetAnalyzeRequest(e.analyzePB).
SetStartTS(e.snapshot).
SetStartTS(startTS).
SetKeepOrder(true).
SetConcurrency(e.concurrency).
SetMemTracker(e.memTracker).
Expand Down
25 changes: 20 additions & 5 deletions executor/analyze_fast.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err
}
}
txn.SetOption(kv.Priority, kv.PriorityLow)
txn.SetOption(kv.IsolationLevel, kv.SI)
isoLevel := kv.RC
if e.ctx.GetSessionVars().EnableAnalyzeSnapshot {
isoLevel = kv.SI
}
txn.SetOption(kv.IsolationLevel, isoLevel)
txn.SetOption(kv.NotFillCache, true)
return rollbackFn, nil
}
Expand Down Expand Up @@ -389,8 +393,13 @@ func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, er
}

func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err error) {
snapshot := e.ctx.GetStore().GetSnapshot(kv.NewVersion(e.snapshot))
snapshot.SetOption(kv.IsolationLevel, kv.SI)
var snapshot kv.Snapshot
if e.ctx.GetSessionVars().EnableAnalyzeSnapshot {
snapshot = e.ctx.GetStore().GetSnapshot(kv.NewVersion(e.snapshot))
snapshot.SetOption(kv.IsolationLevel, kv.SI)
} else {
snapshot = e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand All @@ -411,9 +420,15 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err

func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
defer e.wg.Done()
snapshot := e.ctx.GetStore().GetSnapshot(kv.NewVersion(e.snapshot))
var snapshot kv.Snapshot
if e.ctx.GetSessionVars().EnableAnalyzeSnapshot {
snapshot = e.ctx.GetStore().GetSnapshot(kv.NewVersion(e.snapshot))
snapshot.SetOption(kv.IsolationLevel, kv.SI)
} else {
snapshot = e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(kv.IsolationLevel, kv.RC)
}
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.SI)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setOptionForTopSQL(e.ctx.GetSessionVars().StmtCtx, snapshot)
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
Expand Down
7 changes: 6 additions & 1 deletion executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
"math"
"sync/atomic"
"time"

Expand Down Expand Up @@ -142,9 +143,13 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
}
kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
startTS := uint64(math.MaxUint64)
if e.ctx.GetSessionVars().EnableAnalyzeSnapshot {
startTS = e.snapshot
}
kvReq, err := kvReqBuilder.
SetAnalyzeRequest(e.analyzePB).
SetStartTS(e.snapshot).
SetStartTS(startTS).
SetKeepOrder(true).
SetConcurrency(e.concurrency).
Build()
Expand Down
168 changes: 122 additions & 46 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,53 +583,67 @@ func TestAnalyzeFullSamplingOnIndexWithVirtualColumnOrPrefixColumn(t *testing.T)
tk.MustQuery("show stats_topn where table_name = 'sampling_index_prefix_col' and column_name = 'idx'").Check(testkit.Rows("test sampling_index_prefix_col idx 1 a 3"))
}

func TestSnapshotAnalyze(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
func TestSnapshotAnalyzeAndMaxTSAnalyze(t *testing.T) {
for _, analyzeSnapshot := range []bool{true, false} {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, index index_a(a))")
is := tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema)
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tblInfo := tbl.Meta()
tid := tblInfo.ID
tk.MustExec("insert into t values(1),(1),(1)")
tk.MustExec("begin")
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
startTS1 := txn.StartTS()
tk.MustExec("commit")
tk.MustExec("insert into t values(2),(2),(2)")
tk.MustExec("begin")
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
startTS2 := txn.StartTS()
tk.MustExec("commit")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS1)))
tk.MustExec("analyze table t")
rows := tk.MustQuery(fmt.Sprintf("select count, snapshot from mysql.stats_meta where table_id = %d", tid)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "3", rows[0][0])
s1Str := rows[0][1].(string)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS2)))
tk.MustExec("analyze table t")
rows = tk.MustQuery(fmt.Sprintf("select count, snapshot from mysql.stats_meta where table_id = %d", tid)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "6", rows[0][0])
s2Str := rows[0][1].(string)
require.True(t, s1Str != s2Str)
tk.MustExec("set @@session.tidb_analyze_version = 2")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS1)))
tk.MustExec("analyze table t")
rows = tk.MustQuery(fmt.Sprintf("select count, snapshot from mysql.stats_meta where table_id = %d", tid)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "6", rows[0][0])
s3Str := rows[0][1].(string)
require.Equal(t, s2Str, s3Str)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot"))
tk.MustExec("use test")
if analyzeSnapshot {
tk.MustExec("set @@session.tidb_enable_analyze_snapshot = on")
} else {
tk.MustExec("set @@session.tidb_enable_analyze_snapshot = off")
}
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, index index_a(a))")
is := tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema)
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tblInfo := tbl.Meta()
tid := tblInfo.ID
tk.MustExec("insert into t values(1),(1),(1)")
tk.MustExec("begin")
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
startTS1 := txn.StartTS()
tk.MustExec("commit")
tk.MustExec("insert into t values(2),(2),(2)")
tk.MustExec("begin")
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
startTS2 := txn.StartTS()
tk.MustExec("commit")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS1)))
tk.MustExec("analyze table t")
rows := tk.MustQuery(fmt.Sprintf("select count, snapshot from mysql.stats_meta where table_id = %d", tid)).Rows()
require.Len(t, rows, 1)
if analyzeSnapshot {
// Analyze cannot see the second insert if it reads the snapshot.
require.Equal(t, "3", rows[0][0])
} else {
// Analyze can see the second insert if it reads the latest data.
require.Equal(t, "6", rows[0][0])
}
s1Str := rows[0][1].(string)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS2)))
tk.MustExec("analyze table t")
rows = tk.MustQuery(fmt.Sprintf("select count, snapshot from mysql.stats_meta where table_id = %d", tid)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "6", rows[0][0])
s2Str := rows[0][1].(string)
require.True(t, s1Str != s2Str)
tk.MustExec("set @@session.tidb_analyze_version = 2")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS1)))
tk.MustExec("analyze table t")
rows = tk.MustQuery(fmt.Sprintf("select count, snapshot from mysql.stats_meta where table_id = %d", tid)).Rows()
require.Len(t, rows, 1)
require.Equal(t, "6", rows[0][0])
s3Str := rows[0][1].(string)
// The third analyze doesn't write results into mysql.stats_xxx because its snapshot is smaller than the second analyze.
require.Equal(t, s2Str, s3Str)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot"))
}
}

func TestAdjustSampleRateNote(t *testing.T) {
Expand Down Expand Up @@ -3091,3 +3105,65 @@ PARTITION BY RANGE ( a ) (
tbl := h.GetTableStats(tableInfo)
require.Equal(t, int64(6), tbl.Columns[tableInfo.Columns[0].ID].Histogram.NDV)
}

func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustQuery("select @@global.tidb_enable_analyze_snapshot").Check(testkit.Rows("0"))
// We want to test that HandleAutoAnalyze is aware of setting @@global.tidb_enable_analyze_snapshot to 1 and reads data from snapshot.
tk.MustExec("set @@global.tidb_enable_analyze_snapshot = 1")
tk.MustExec("set @@global.tidb_analyze_version = 2")
tk.MustExec("create table t(a int)")
h := dom.StatsHandle()
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tid := tbl.Meta().ID
tk.MustExec("insert into t values(1),(2),(3)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
err = h.Update(dom.InfoSchema())
require.NoError(t, err)
tk.MustExec("analyze table t")
tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tid)).Check(testkit.Rows(
"3 0",
))

originalVal1 := handle.AutoAnalyzeMinCnt
originalVal2 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string)
handle.AutoAnalyzeMinCnt = 0
tk.MustExec("set global tidb_auto_analyze_ratio = 0.001")
defer func() {
handle.AutoAnalyzeMinCnt = originalVal1
tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal2))
}()

tk.MustExec("begin")
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
startTS := txn.StartTS()
tk.MustExec("commit")

tk.MustExec("insert into t values(4),(5),(6)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
err = h.Update(dom.InfoSchema())
require.NoError(t, err)

// Simulate that the analyze would start before and finish after the second insert.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectBaseCount", "return(3)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/injectBaseModifyCount", "return(0)"))
require.True(t, h.HandleAutoAnalyze(dom.InfoSchema()))
// Check the count / modify_count changes during the analyze are not lost.
tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tid)).Check(testkit.Rows(
"6 3",
))
// Check the histogram is correct for the snapshot analyze.
tk.MustQuery(fmt.Sprintf("select distinct_count from mysql.stats_histograms where table_id = %d", tid)).Check(testkit.Rows(
"3",
))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectAnalyzeSnapshot"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseCount"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseModifyCount"))
}
10 changes: 10 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,16 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|1292|Truncated incorrect tidb_cost_model_version value: '0'"))
tk.MustExec("set tidb_cost_model_version=2")
tk.MustQuery("select @@tidb_cost_model_version").Check(testkit.Rows("2"))

tk.MustQuery("select @@tidb_enable_analyze_snapshot").Check(testkit.Rows("0"))
tk.MustExec("set global tidb_enable_analyze_snapshot = 1")
tk.MustQuery("select @@global.tidb_enable_analyze_snapshot").Check(testkit.Rows("1"))
tk.MustExec("set global tidb_enable_analyze_snapshot = 0")
tk.MustQuery("select @@global.tidb_enable_analyze_snapshot").Check(testkit.Rows("0"))
tk.MustExec("set session tidb_enable_analyze_snapshot = 1")
tk.MustQuery("select @@session.tidb_enable_analyze_snapshot").Check(testkit.Rows("1"))
tk.MustExec("set session tidb_enable_analyze_snapshot = 0")
tk.MustQuery("select @@session.tidb_enable_analyze_snapshot").Check(testkit.Rows("0"))
}

func TestGetSetNoopVars(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,10 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
if execOption.AnalyzeVer != 0 {
s.sessionVars.AnalyzeVersion = execOption.AnalyzeVer
}
prevAnalyzeSnapshot := s.sessionVars.EnableAnalyzeSnapshot
if execOption.AnalyzeSnapshot != nil {
s.sessionVars.EnableAnalyzeSnapshot = *execOption.AnalyzeSnapshot
}
prePruneMode := s.sessionVars.PartitionPruneMode.Load()
if len(execOption.PartitionPruneMode) > 0 {
s.sessionVars.PartitionPruneMode.Store(execOption.PartitionPruneMode)
Expand All @@ -1768,6 +1772,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
prevTables := s.sessionVars.StmtCtx.Tables
return s, func() {
s.sessionVars.AnalyzeVersion = prevStatsVer
s.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot
if err := s.sessionVars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil {
logutil.BgLogger().Error("set tidbSnapshot error", zap.Error(err))
}
Expand Down Expand Up @@ -1811,13 +1816,19 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f
se.sessionVars.AnalyzeVersion = execOption.AnalyzeVer
}

prevAnalyzeSnapshot := se.sessionVars.EnableAnalyzeSnapshot
if execOption.AnalyzeSnapshot != nil {
se.sessionVars.EnableAnalyzeSnapshot = *execOption.AnalyzeSnapshot
}

prePruneMode := se.sessionVars.PartitionPruneMode.Load()
if len(execOption.PartitionPruneMode) > 0 {
se.sessionVars.PartitionPruneMode.Store(execOption.PartitionPruneMode)
}

return se, func() {
se.sessionVars.AnalyzeVersion = prevStatsVer
se.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot
if err := se.sessionVars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil {
logutil.BgLogger().Error("set tidbSnapshot error", zap.Error(err))
}
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,11 @@ type SessionVars struct {
// MemoryDebugModeAlarmRatio indicated the allowable bias ratio of memory tracking accuracy check.
// When `(memory trakced by tidb) * (1+MemoryDebugModeAlarmRatio) < actual heapInUse`, an alarm log will be recorded.
MemoryDebugModeAlarmRatio int64

// EnableAnalyzeSnapshot indicates whether to read data on snapshot when collecting statistics.
// When it is false, ANALYZE reads the latest data.
// When it is true, ANALYZE reads data on the snapshot at the beginning of ANALYZE.
EnableAnalyzeSnapshot bool
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,10 @@ var defaultSysVars = []*SysVar{
s.MemoryDebugModeAlarmRatio = TidbOptInt64(val, 0)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableAnalyzeSnapshot, Value: BoolToOnOff(DefTiDBEnableAnalyzeSnapshot), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableAnalyzeSnapshot = TiDBOptOn(val)
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,11 @@ const (
TiDBMemoryDebugModeMinHeapInUse = "tidb_memory_debug_mode_min_heap_inuse"
// TiDBMemoryDebugModeAlarmRatio is used set tidb memory debug mode bias ratio. Treat memory bias less than this ratio as noise.
TiDBMemoryDebugModeAlarmRatio = "tidb_memory_debug_mode_alarm_ratio"

// TiDBEnableAnalyzeSnapshot indicates whether to read data on snapshot when collecting statistics.
// When set to false, ANALYZE reads the latest data.
// When set to true, ANALYZE reads data on the snapshot at the beginning of ANALYZE.
TiDBEnableAnalyzeSnapshot = "tidb_enable_analyze_snapshot"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -975,6 +980,7 @@ const (
DefStreamCountWhenMaxThreadsNotSet = 8
DefTiFlashFineGrainedShuffleBatchSize = 8192
DefAdaptiveClosestReadThreshold = 4096
DefTiDBEnableAnalyzeSnapshot = false
)

// Process global variables.
Expand Down
3 changes: 2 additions & 1 deletion statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...in
})
}

func (h *Handle) execRestrictedSQLWithStatsVer(ctx context.Context, statsVer int, procTrackID uint64, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
func (h *Handle) execRestrictedSQLWithStatsVer(ctx context.Context, statsVer int, procTrackID uint64, analyzeSnapshot bool, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
execOptionForAnalyze[statsVer],
sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot),
sqlexec.GetPartitionPruneModeOption(string(h.CurrentPruneMode())),
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(procTrackID, h.sysProcTracker.Track, h.sysProcTracker.UnTrack),
Expand Down
Loading