Skip to content

Commit

Permalink
statistics: optimize stats delta dumping with batch processing
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <techregister@pm.me>
  • Loading branch information
Rustin170506 committed Jan 8, 2025
1 parent d966219 commit e2678d1
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 89 deletions.
1 change: 1 addition & 0 deletions pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewDeltaUpdate(tableID int64, delta variable.TableDelta, isLocked bool) *De

// UpdateStatsMeta updates the stats meta for multiple tables.
// It uses the INSERT INTO ... ON DUPLICATE KEY UPDATE syntax to fill the missing records.
// Note: Make sure call this function in a transaction.
func UpdateStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
Expand Down
243 changes: 154 additions & 89 deletions pkg/statistics/handle/usage/session_stats_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -82,6 +84,11 @@ func (s *statsUsageImpl) needDumpStatsDelta(is infoschema.InfoSchema, dumpAll bo
return false
}

const (
dumpDeltaBatchSize = 100_000
tooSlowThreshold = time.Minute
)

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
Expand All @@ -91,96 +98,167 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error {
dur := time.Since(start)
metrics.StatsDeltaUpdateHistogram.Observe(dur.Seconds())
}()

s.SweepSessionStatsList()
deltaMap := s.SessionTableDelta().GetDeltaAndReset()
defer func() {
s.SessionTableDelta().Merge(deltaMap)
}()
if time.Since(start) > tooSlowThreshold {
statslogutil.SingletonStatsSamplerLogger().Info("Sweeping session list is too slow",
zap.Int("tableCount", len(deltaMap)),
zap.Duration("duration", time.Since(start)))
}

return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
currentTime := time.Now()
for id, item := range deltaMap {
if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) {
continue
// Sort table IDs to ensure a consistent dump order and avoid potential deadlocks.
tableIDs := make([]int64, 0, len(deltaMap))
for id := range deltaMap {
tableIDs = append(tableIDs, id)
}
slices.Sort(tableIDs)

// Dump stats delta in batches.
for i := 0; i < len(tableIDs); i += dumpDeltaBatchSize {
end := i + dumpDeltaBatchSize
if end > len(tableIDs) {
end = len(tableIDs)
}

batchTableIDs := tableIDs[i:end]
var (
statsVersion uint64
batchUpdates []*storage.DeltaUpdate
)
batchStart := time.Now()
err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
batchUpdates = make([]*storage.DeltaUpdate, 0, len(batchTableIDs))
// Collect all updates in the batch.
for _, id := range batchTableIDs {
item := deltaMap[id]
if !s.needDumpStatsDelta(is, dumpAll, id, item, batchStart) {
continue
}
batchUpdates = append(batchUpdates, storage.NewDeltaUpdate(id, item, false))
}
if time.Since(batchStart) > tooSlowThreshold {
statslogutil.SingletonStatsSamplerLogger().Info("Collecting batch updates is too slow",
zap.Int("tableCount", len(batchUpdates)),
zap.Duration("duration", time.Since(batchStart)))
}
updated, err := s.dumpTableStatCountToKV(is, id, item)

if len(batchUpdates) == 0 {
return nil
}

// Process all updates in the batch with a single transaction.
// Note: batchUpdates may be modified in dumpStatsDeltaToKV.
startTs, err := s.dumpStatsDeltaToKV(is, sctx, batchUpdates)
if err != nil {
return errors.Trace(err)
}
if updated {
UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count)
delete(deltaMap, id)
} else {
m := deltaMap[id]
deltaMap[id] = m
statsVersion = startTs

intest.Assert(len(batchUpdates) >= len(batchTableIDs), "batchUpdates can only be appended")
// Update deltaMap after the batch is successfully dumped.
for _, update := range batchUpdates {
UpdateTableDeltaMap(deltaMap, update.TableID, -update.Delta.Delta, -update.Delta.Count)
delete(deltaMap, update.TableID)
}
}
return nil
})
}

// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version.
// For a partitioned table, we will update its global-stats as well.
func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableID int64, delta variable.TableDelta) (updated bool, err error) {
statsVersion := uint64(0)
isLocked := false
defer func() {
// Only record the historical stats meta when the table is not locked because all stats meta are stored in the locked table.
if err == nil && statsVersion != 0 && !isLocked {
failpoint.Inject("panic-when-record-historical-stats-meta", func() {
panic("panic when record historical stats meta")
})
s.statsHandle.RecordHistoricalStatsMeta(physicalTableID, statsVersion, "flush stats", false)
}
}()
if delta.Count == 0 {
return true, nil
}
if time.Since(batchStart) > tooSlowThreshold {
statslogutil.SingletonStatsSamplerLogger().Info("Dumping batch updates is too slow",
zap.Int("tableCount", len(batchUpdates)),
zap.Duration("duration", time.Since(batchStart)))
}

err = utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
statsVersion, err = utilstats.GetStartTS(sctx)
return nil
}, utilstats.FlagWrapTxn)
if err != nil {
return errors.Trace(err)
}

tbl, _, _ := is.FindTableByPartitionID(physicalTableID)
// Check if the table and its partitions are locked.
tidAndPid := make([]int64, 0, 2)
if tbl != nil {
tidAndPid = append(tidAndPid, tbl.Meta().ID)
// Record historical stats meta for all tables one by one.
// FIXME: Although this feature is currently disabled, it would be beneficial to implement it in batches for efficiency.
for _, update := range batchUpdates {
if !update.IsLocked {
failpoint.Inject("panic-when-record-historical-stats-meta", func() {
panic("panic when record historical stats meta")
})
s.statsHandle.RecordHistoricalStatsMeta(update.TableID, statsVersion, "flush stats", false)
}
}
tidAndPid = append(tidAndPid, physicalTableID)
lockedTables, err := s.statsHandle.GetLockedTables(tidAndPid...)
if err != nil {
return err
}

return nil
}

// dumpStatsDeltaToKV processes and writes multiple table stats count deltas to KV storage in batches.
// Note: The `batchUpdates` parameter may be modified during the execution of this function.
//
// 1. Handles partitioned tables:
// - For partitioned tables, the function ensures that the global statistics are updated appropriately
// in addition to the individual partition statistics.
//
// 2. Stashes lock information:
// - Records lock information for each table or partition.
func (s *statsUsageImpl) dumpStatsDeltaToKV(
is infoschema.InfoSchema,
sctx sessionctx.Context,
updates []*storage.DeltaUpdate,
) (statsVersion uint64, err error) {
if len(updates) == 0 {
return 0, nil
}
statsVersion, err = utilstats.GetStartTS(sctx)
if err != nil {
return 0, errors.Trace(err)
}

// Collect all table IDs that need lock checking.
allTableIDs := make([]int64, 0, len(updates))
for _, update := range updates {
// No need to update if the delta is zero.
if update.Delta.Count == 0 {
continue
}
// Add psychical table ID.
allTableIDs = append(allTableIDs, update.TableID)
// Add parent table ID if it's a partition table.
if tbl, _, _ := is.FindTableByPartitionID(update.TableID); tbl != nil {
allTableIDs = append(allTableIDs, tbl.Meta().ID)
}
}

// Batch get lock status for all tables.
lockedTables, err := s.statsHandle.GetLockedTables(allTableIDs...)
if err != nil {
return 0, errors.Trace(err)
}

// Prepare batch updates
for _, update := range updates {
// No need to update if the delta is zero.
if update.Delta.Count == 0 {
continue
}

var affectedRows uint64
// If it's a partitioned table and its global-stats exists,
// update its count and modify_count as well.
if tbl != nil {
// We need to check if the table and the partition are locked.
tbl, _, _ := is.FindTableByPartitionID(update.TableID)
if tbl != nil { // It's a partition table.
tableID := tbl.Meta().ID
isTableLocked := false
isPartitionLocked := false
tableID := tbl.Meta().ID

if _, ok := lockedTables[tableID]; ok {
isTableLocked = true
}
if _, ok := lockedTables[physicalTableID]; ok {
if _, ok := lockedTables[update.TableID]; ok {
isPartitionLocked = true
}

tableOrPartitionLocked := isTableLocked || isPartitionLocked
isLocked = tableOrPartitionLocked
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(physicalTableID, delta, tableOrPartitionLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
update.IsLocked = tableOrPartitionLocked

// If the partition is locked, we don't need to update the global-stats.
// We will update its global-stats when the partition is unlocked.
// 1. If table is locked and partition is locked, we only stash the delta in the partition's lock info.
Expand All @@ -193,40 +271,27 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
// 4. If table is not locked and partition is not locked, we update the global-stats.
// To sum up, we only need to update the global-stats when the table and the partition are not locked.
if !isTableLocked && !isPartitionLocked {
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(tableID, delta, isTableLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
updates = append(updates, &storage.DeltaUpdate{
TableID: tableID,
Delta: update.Delta,
IsLocked: isTableLocked,
})
}
} else {
// This is a non-partitioned table.
// Check if it's locked.
isTableLocked := false
if _, ok := lockedTables[physicalTableID]; ok {
if _, ok := lockedTables[update.TableID]; ok {
isTableLocked = true
}
isLocked = isTableLocked
if err = storage.UpdateStatsMeta(
utilstats.StatsCtx,
sctx,
statsVersion,
storage.NewDeltaUpdate(physicalTableID, delta, isTableLocked),
); err != nil {
return err
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
update.IsLocked = isTableLocked
}
}

// Batch update stats meta.
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, updates...); err != nil {
return 0, errors.Trace(err)
}

updated = affectedRows > 0
return nil
}, utilstats.FlagWrapTxn)
return
return statsVersion, nil
}

// DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV.
Expand Down

0 comments on commit e2678d1

Please sign in to comment.