Skip to content

Commit

Permalink
statistics: handle lock and unlock operations correctly
Browse files Browse the repository at this point in the history
fix

fix
  • Loading branch information
Rustin170506 committed Oct 14, 2024
1 parent 94b2ac0 commit c909968
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ go_test(
"static_partitioned_table_analysis_job_test.go",
],
flaky = True,
shard_count = 44,
shard_count = 46,
deps = [
":priorityqueue",
"//pkg/ddl/notifier",
Expand Down
55 changes: 51 additions & 4 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
Expand Down Expand Up @@ -229,14 +230,19 @@ func (pq *AnalysisPriorityQueueV2) ProcessDMLChanges() {
// We need to fetch the next check version with offset before fetching new DML changes.
// Otherwise, we may miss some DML changes happened during the process.
newMaxVersion := pq.statsHandle.GetNextCheckVersionWithOffset()
// Query locked tables once to minimize overhead.
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
values := pq.statsHandle.Values()
lastFetchTimestamp := pq.syncFields.lastDMLUpdateFetchTimestamp
for _, value := range values {
// We only process the tables that have been updated.
// So here we only need to process the tables whose version is greater than the last fetch timestamp.
if value.Version > lastFetchTimestamp {
// TODO: consider locked tables and partitions. Consider move this check to
err := pq.processTableStats(sctx, value, parameters)
err := pq.processTableStats(sctx, value, parameters, lockedTables)
if err != nil {
statslogutil.StatsLogger().Error(
"Failed to process table stats",
Expand All @@ -263,6 +269,7 @@ func (pq *AnalysisPriorityQueueV2) processTableStats(
sctx sessionctx.Context,
stats *statistics.Table,
parameters map[string]string,
lockedTables map[int64]struct{},
) error {
// Check if the table is eligible for analysis first to avoid unnecessary work.
if !stats.IsEligibleForAnalysis() {
Expand Down Expand Up @@ -292,8 +299,26 @@ func (pq *AnalysisPriorityQueueV2) processTableStats(
// This is acceptable for now, but in the future, we may consider separating the analysis job for each partition.
job, ok, _ := pq.syncFields.inner.GetByKey(stats.PhysicalID)
if !ok {
job = pq.tryCreateJob(is, stats, pruneMode, jobFactory)
job = pq.tryCreateJob(is, stats, pruneMode, jobFactory, lockedTables)
} else {
// If the table is locked, we do not analyze it.
// Safety: **Dynamic partitioned tables are handled in the tryCreateJob branch.**
// For non-partitioned tables, we can skip the table here.
// For static partitioned tables, we can skip the partition here.
// If the entire partitioned table is locked, we can skip the table here.
// The only case where some partitions are locked and the pruning mode is dynamic is correctly handled in the tryCreateJob branch.
if _, ok := lockedTables[stats.PhysicalID]; ok {
// Clean up the job if the table is locked.
err := pq.syncFields.inner.Delete(job)
if err != nil {
statslogutil.StatsLogger().Error(
"Failed to delete job from priority queue",
zap.Error(err),
zap.String("job", job.String()),
)
}
return nil
}
job = pq.tryUpdateJob(is, stats, job, jobFactory)
}
return pq.pushWithoutLock(job)
Expand All @@ -303,6 +328,7 @@ func (pq *AnalysisPriorityQueueV2) tryCreateJob(
stats *statistics.Table,
pruneMode variable.PartitionPruneMode,
jobFactory *AnalysisJobFactory,
lockedTables map[int64]struct{},
) (job AnalysisJob) {
if stats == nil {
return nil
Expand All @@ -327,6 +353,10 @@ func (pq *AnalysisPriorityQueueV2) tryCreateJob(
}
partitionedTable := tableMeta.GetPartitionInfo()
if partitionedTable == nil {
// If the table is locked, we do not analyze it.
if _, ok := lockedTables[tableMeta.ID]; ok {
return nil
}
job = jobFactory.CreateNonPartitionedTableAnalysisJob(
schemaName.O,
tableMeta,
Expand All @@ -351,6 +381,10 @@ func (pq *AnalysisPriorityQueueV2) tryCreateJob(
// TODO: add tests to verify this behavior.
return nil
}
// If the partition is locked, we do not analyze it.
if _, ok := lockedTables[partitionDef.ID]; ok {
return nil
}
job = jobFactory.CreateStaticPartitionAnalysisJob(
schemaName.O,
tableMeta,
Expand All @@ -359,7 +393,20 @@ func (pq *AnalysisPriorityQueueV2) tryCreateJob(
stats,
)
} else {
partitionStats := GetPartitionStats(pq.statsHandle, tableMeta, partitionDefs)
// If the table is locked, we do not analyze it.
// Note: the table meta is the parent table meta.
if _, ok := lockedTables[tableMeta.ID]; ok {
return nil
}

// Only analyze the partition that has not been locked.
filteredPartitionDefs := make([]model.PartitionDefinition, 0, len(partitionDefs))
for _, def := range partitionDefs {
if _, ok := lockedTables[def.ID]; !ok {
filteredPartitionDefs = append(filteredPartitionDefs, def)
}
}
partitionStats := GetPartitionStats(pq.statsHandle, tableMeta, filteredPartitionDefs)
job = jobFactory.CreateDynamicPartitionedTableAnalysisJob(
schemaName.O,
tableMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -104,6 +105,7 @@ func (pq *AnalysisPriorityQueueV2) getAndDeleteJob(tableID int64) error {
// recreateAndPushJob is a helper function that recreates a job and pushes it to the queue.
func (pq *AnalysisPriorityQueueV2) recreateAndPushJob(
sctx sessionctx.Context,
lockedTables map[int64]struct{},
pruneMode variable.PartitionPruneMode,
stats *statistics.Table,
) error {
Expand All @@ -115,7 +117,7 @@ func (pq *AnalysisPriorityQueueV2) recreateAndPushJob(
}
jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
job := pq.tryCreateJob(is, stats, pruneMode, jobFactory)
job := pq.tryCreateJob(is, stats, pruneMode, jobFactory, lockedTables)
return pq.pushWithoutLock(job)
}

Expand All @@ -126,19 +128,23 @@ func (pq *AnalysisPriorityQueueV2) recreateAndPushJob(
func (pq *AnalysisPriorityQueueV2) recreateAndPushJobForTable(sctx sessionctx.Context, tableInfo *model.TableInfo) error {
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
partitionInfo := tableInfo.GetPartitionInfo()
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
// For static partitioned tables, we need to recreate the job for each partition.
if partitionInfo != nil && pruneMode == variable.Static {
for _, def := range partitionInfo.Definitions {
partitionStats := pq.statsHandle.GetPartitionStatsForAutoAnalyze(tableInfo, def.ID)
err := pq.recreateAndPushJob(sctx, pruneMode, partitionStats)
err := pq.recreateAndPushJob(sctx, lockedTables, pruneMode, partitionStats)
if err != nil {
return err
}
}
return nil
}
stats := pq.statsHandle.GetTableStatsForAutoAnalyze(tableInfo)
return pq.recreateAndPushJob(sctx, pruneMode, stats)
return pq.recreateAndPushJob(sctx, lockedTables, pruneMode, stats)
}

func (pq *AnalysisPriorityQueueV2) handleAddIndexEvent(
Expand All @@ -158,12 +164,16 @@ func (pq *AnalysisPriorityQueueV2) handleAddIndexEvent(
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
partitionInfo := tableInfo.GetPartitionInfo()
lockedTables, err := lockstats.QueryLockedTables(sctx)
if err != nil {
return err
}
if pruneMode == variable.Static && partitionInfo != nil {
// For static partitioned tables, we need to recreate the job for each partition.
for _, def := range partitionInfo.Definitions {
partitionID := def.ID
partitionStats := pq.statsHandle.GetPartitionStatsForAutoAnalyze(tableInfo, partitionID)
job := pq.tryCreateJob(is, partitionStats, pruneMode, jobFactory)
job := pq.tryCreateJob(is, partitionStats, pruneMode, jobFactory, lockedTables)
return pq.pushWithoutLock(job)
}
return nil
Expand All @@ -172,7 +182,7 @@ func (pq *AnalysisPriorityQueueV2) handleAddIndexEvent(
// For normal tables and dynamic partitioned tables, we only need to recreate the job for the table.
stats := pq.statsHandle.GetTableStatsForAutoAnalyze(tableInfo)
// Directly create a new job for the newly added index.
job := pq.tryCreateJob(is, stats, pruneMode, jobFactory)
job := pq.tryCreateJob(is, stats, pruneMode, jobFactory, lockedTables)
return pq.pushWithoutLock(job)
}

Expand Down
121 changes: 121 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,124 @@ func TestRequeueFailedJobs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, l)
}

func TestProcessDMLChangesWithLockedTables(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
tk.MustExec("insert into t1 values (1)")
tk.MustExec("insert into t2 values (1)")
statistics.AutoAnalyzeMinCnt = 0
defer func() {
statistics.AutoAnalyzeMinCnt = 1000
}()

ctx := context.Background()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

pq := priorityqueue.NewAnalysisPriorityQueueV2(handle)
defer pq.Close()
require.NoError(t, pq.Initialize())

schema := pmodel.NewCIStr("test")
tbl1, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t1"))
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t2"))
require.NoError(t, err)

// Check current jobs.
job, err := pq.Peek()
require.NoError(t, err)
require.Equal(t, tbl1.Meta().ID, job.GetTableID())

// Lock t1.
tk.MustExec("lock stats t1")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Process the DML changes.
pq.ProcessDMLChanges()

// Check if the jobs have been updated.
job, err = pq.Peek()
require.NoError(t, err)
require.Equal(t, tbl2.Meta().ID, job.GetTableID())

// Unlock t1.
tk.MustExec("unlock stats t1")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Process the DML changes.
pq.ProcessDMLChanges()

// Check if the jobs have been updated.
l, err := pq.Len()
require.NoError(t, err)
require.Equal(t, 2, l)
}

func TestProcessDMLChangesWithLockedPartitionsWithStaticPruneMode(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))")
tk.MustExec("insert into t1 values (1)")
tk.MustExec("set global tidb_partition_prune_mode = 'static'")
statistics.AutoAnalyzeMinCnt = 0
defer func() {
statistics.AutoAnalyzeMinCnt = 1000
}()

ctx := context.Background()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
tk.MustExec("analyze table t1")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
schema := pmodel.NewCIStr("test")
tbl, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t1"))
require.NoError(t, err)

// Insert more rows into partition p0.
tk.MustExec("insert into t1 partition (p0) values (2), (3), (4), (5), (6), (7), (8), (9)")
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

pq := priorityqueue.NewAnalysisPriorityQueueV2(handle)
defer pq.Close()
require.NoError(t, pq.Initialize())

// Check current jobs.
job, err := pq.Peek()
require.NoError(t, err)
pid := tbl.Meta().Partition.Definitions[0].ID
require.Equal(t, pid, job.GetTableID())

// Lock partition p0.
tk.MustExec("lock stats t1 partition p0")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Process the DML changes.
pq.ProcessDMLChanges()

// No jobs should be in the queue.
l, err := pq.Len()
require.NoError(t, err)
require.Equal(t, 0, l)

// Unlock partition p0.
tk.MustExec("unlock stats t1 partition (p0)")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Process the DML changes.
pq.ProcessDMLChanges()

// Check if the jobs have been updated.
job, err = pq.Peek()
require.NoError(t, err)
pid = tbl.Meta().Partition.Definitions[0].ID
require.Equal(t, pid, job.GetTableID())
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestLockAndUnlockPartitionStats(t *testing.T) {
"Warning 1105 skip analyze locked table: test.t partition (p0)",
))
partitionStats1 := handle.GetPartitionStats(tbl, p0Id)
require.Equal(t, partitionStats, partitionStats1)
require.Equal(t, partitionStats.RealtimeCount, partitionStats1.RealtimeCount)
require.Equal(t, int64(0), partitionStats1.RealtimeCount)

tk.MustExec("unlock stats t partition p0")
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
],
)
Expand Down Expand Up @@ -46,6 +47,7 @@ go_test(
"//pkg/util/mock",
"//pkg/util/sqlexec/mock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_goleak//:goleak",
Expand Down
Loading

0 comments on commit c909968

Please sign in to comment.