Skip to content

Commit

Permalink
DDL: update TiFlash replica progress after available (pingcap#37766)
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen authored Sep 22, 2022
1 parent e4cb6a7 commit 319b320
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 105 deletions.
244 changes: 182 additions & 62 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package ddl

import (
"bytes"
"container/list"
"context"
"encoding/json"
"fmt"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -112,11 +115,18 @@ func NewPollTiFlashBackoffContext(MinThreshold, MaxThreshold TiFlashTick, Capaci

// TiFlashManagementContext is the context for TiFlash Replica Management
type TiFlashManagementContext struct {
TiFlashStores map[int64]helper.StoreStat
HandlePdCounter uint64
UpdateTiFlashStoreCounter uint64
UpdateMap map[int64]bool
Backoff *PollTiFlashBackoffContext
TiFlashStores map[int64]helper.StoreStat
PollCounter uint64
ProgressCache map[int64]string
Backoff *PollTiFlashBackoffContext
// tables waiting for updating progress after become available.
UpdatingProgressTables *list.List
}

// AvailableTableID is the table id info of available table for waiting to update TiFlash replica progress.
type AvailableTableID struct {
ID int64
IsPartition bool
}

// Tick will first check increase Counter.
Expand Down Expand Up @@ -200,11 +210,11 @@ func NewTiFlashManagementContext() (*TiFlashManagementContext, error) {
return nil, err
}
return &TiFlashManagementContext{
HandlePdCounter: 0,
UpdateTiFlashStoreCounter: 0,
TiFlashStores: make(map[int64]helper.StoreStat),
UpdateMap: make(map[int64]bool),
Backoff: c,
PollCounter: 0,
TiFlashStores: make(map[int64]helper.StoreStat),
ProgressCache: make(map[int64]string),
Backoff: c,
UpdatingProgressTables: list.New(),
}, nil
}

Expand All @@ -223,6 +233,10 @@ var (
PollTiFlashBackoffCapacity int = 1000
// PollTiFlashBackoffRate is growth rate of exponential backoff threshold.
PollTiFlashBackoffRate TiFlashTick = 1.5
// RefreshProgressMaxTableCount is the max count of table to refresh progress after available each poll.
RefreshProgressMaxTableCount uint64 = 1000
// PollCleanProgressCacheInterval is the inteval (PollTiFlashInterval * PollCleanProgressCacheInterval) of cleaning progress cache to avoid data race when ddl owner switchover
PollCleanProgressCacheInterval uint64 = 300
)

func getTiflashHTTPAddr(host string, statusAddr string) (string, error) {
Expand Down Expand Up @@ -346,37 +360,141 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error {
return nil
}

func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) (bool, error) {
allReplicaReady := true
defer func() {
pollTiFlashContext.HandlePdCounter++
pollTiFlashContext.HandlePdCounter %= PullTiFlashPdTick.Load()
}()
func getTiFlashPeerWithoutLagCount(pollTiFlashContext *TiFlashManagementContext, tableID int64) (int, error) {
// storeIDs -> regionID, PD will not create two peer on the same store
var flashPeerCount int
for _, store := range pollTiFlashContext.TiFlashStores {
regionReplica := make(map[int64]int)
err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, &regionReplica)
if err != nil {
logutil.BgLogger().Error("Fail to get peer status from TiFlash.",
zap.Int64("tableID", tableID))
return 0, err
}
flashPeerCount += len(regionReplica)
}
return flashPeerCount, nil
}

// getTiFlashTableSyncProgress return truncated string to avoid float64 comparison.
func getTiFlashTableSyncProgress(pollTiFlashContext *TiFlashManagementContext, tableID int64, replicaCount uint64) (string, error) {
var stats helper.PDRegionStats
if err := infosync.GetTiFlashPDRegionRecordStats(context.Background(), tableID, &stats); err != nil {
logutil.BgLogger().Error("Fail to get region stats from PD.",
zap.Int64("tableID", tableID))
return "0", errors.Trace(err)
}
regionCount := stats.Count

tiflashPeerCount, err := getTiFlashPeerWithoutLagCount(pollTiFlashContext, tableID)
if err != nil {
logutil.BgLogger().Error("Fail to get peer count from TiFlash.",
zap.Int64("tableID", tableID))
return "0", errors.Trace(err)
}
progress := float64(tiflashPeerCount) / float64(regionCount*int(replicaCount))
if progress > 1 { // when pd do balance
logutil.BgLogger().Debug("TiFlash peer count > pd peer count, maybe doing balance.",
zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount))
progress = 1
}
return types.TruncateFloatToString(progress, 2), nil
}

func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) {
pollMaxCount := RefreshProgressMaxTableCount
failpoint.Inject("PollAvailableTableProgressMaxCount", func(val failpoint.Value) {
pollMaxCount = uint64(val.(int))
})
for element := pollTiFlashContext.UpdatingProgressTables.Front(); element != nil && pollMaxCount > 0; pollMaxCount-- {
availableTableID := element.Value.(AvailableTableID)
var table table.Table
if availableTableID.IsPartition {
table, _, _ = schemas.FindTableByPartitionID(availableTableID.ID)
if table == nil {
logutil.BgLogger().Info("get table by partition failed, may be dropped or truncated",
zap.Int64("partitionID", availableTableID.ID),
)
pollTiFlashContext.UpdatingProgressTables.Remove(element)
element = element.Next()
continue
}
} else {
var ok bool
table, ok = schemas.TableByID(availableTableID.ID)
if !ok {
logutil.BgLogger().Info("get table id failed, may be dropped or truncated",
zap.Int64("tableID", availableTableID.ID),
)
pollTiFlashContext.UpdatingProgressTables.Remove(element)
element = element.Next()
continue
}
}

tableInfo := table.Meta()
if tableInfo.TiFlashReplica == nil {
logutil.BgLogger().Info("table has no TiFlash replica",
zap.Int64("tableID or partitionID", availableTableID.ID),
zap.Bool("IsPartition", availableTableID.IsPartition),
)
pollTiFlashContext.UpdatingProgressTables.Remove(element)
element = element.Next()
continue
}
progress, err := getTiFlashTableSyncProgress(pollTiFlashContext, availableTableID.ID, tableInfo.TiFlashReplica.Count)
if err != nil {
logutil.BgLogger().Error("get tiflash sync progress failed",
zap.Error(err),
zap.Int64("tableID", availableTableID.ID),
zap.Bool("IsPartition", availableTableID.IsPartition),
)
continue
}
if pollTiFlashContext.ProgressCache[availableTableID.ID] != progress {
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), availableTableID.ID, progress)
if err != nil {
logutil.BgLogger().Error("updating TiFlash replica process failed",
zap.Error(err),
zap.Int64("tableID or partitionID", availableTableID.ID),
zap.Bool("IsPartition", availableTableID.IsPartition),
zap.String("progress", progress),
)
continue
}
pollTiFlashContext.ProgressCache[availableTableID.ID] = progress
}
pollTiFlashContext.UpdatingProgressTables.Remove(element)
element = element.Next()
}
}

updateTiFlash := pollTiFlashContext.UpdateTiFlashStoreCounter%UpdateTiFlashStoreTick.Load() == 0
if updateTiFlash {
func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) error {
if pollTiFlashContext.PollCounter%UpdateTiFlashStoreTick.Load() == 0 {
if err := updateTiFlashStores(pollTiFlashContext); err != nil {
// If we failed to get from pd, retry everytime.
pollTiFlashContext.UpdateTiFlashStoreCounter = 0
return false, err
pollTiFlashContext.PollCounter = 0
return err
}
}
pollTiFlashContext.UpdateTiFlashStoreCounter += 1
pollTiFlashContext.UpdateTiFlashStoreCounter %= UpdateTiFlashStoreTick.Load()

// The following loop updates TiFlash store's status address.
for _, store := range pollTiFlashContext.TiFlashStores {
s := store
if err := d.UpdateTiFlashHTTPAddress(&s); err != nil {
}
failpoint.Inject("PollTiFlashReplicaStatusCleanProgressCache", func() {
pollTiFlashContext.PollCounter = PollCleanProgressCacheInterval
})
// 10min clean progress cache to avoid data race
if pollTiFlashContext.PollCounter > 0 && pollTiFlashContext.PollCounter%PollCleanProgressCacheInterval == 0 {
pollTiFlashContext.ProgressCache = make(map[int64]string)
}
pollTiFlashContext.PollCounter++

// Start to process every table.
schema := d.GetInfoSchemaWithInterceptor(ctx)
if schema == nil {
return false, errors.New("Schema is nil")
return errors.New("Schema is nil")
}

pollAvailableTableProgress(schema, ctx, pollTiFlashContext)

var tableList = make([]TiFlashReplicaStatus, 0)

// Collect TiFlash Replica info, for every table.
Expand All @@ -388,6 +506,11 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
}
}

needPushPending := false
if pollTiFlashContext.UpdatingProgressTables.Len() == 0 {
needPushPending = true
}

for _, tb := range tableList {
// For every region in each table, if it has one replica, we reckon it ready.
// These request can be batched as an optimization.
Expand All @@ -397,53 +520,44 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
})
// We only check unavailable tables here, so doesn't include blocked add partition case.
if !available {
allReplicaReady = false
enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID)
if inqueue && !enabled {
logutil.BgLogger().Info("Escape checking available status due to backoff", zap.Int64("tableId", tb.ID))
continue
}

// We don't need to set accelerate schedule for this table, since it is already done in DDL, when
// 1. Add partition
// 2. Set TiFlash replica

// Compute sync data process by request TiFlash.
regionReplica := make(map[int64]int)
for _, store := range pollTiFlashContext.TiFlashStores {
err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tb.ID, &regionReplica)
progress, err := getTiFlashTableSyncProgress(pollTiFlashContext, tb.ID, tb.Count)
if err != nil {
logutil.BgLogger().Error("get tiflash sync progress failed",
zap.Error(err),
zap.Int64("tableID", tb.ID),
)
continue
}
if pollTiFlashContext.ProgressCache[tb.ID] != progress {
err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), tb.ID, progress)
if err != nil {
return allReplicaReady, errors.Trace(err)
logutil.BgLogger().Error("updating TiFlash replica process failed",
zap.Error(err),
zap.Int64("tableID", tb.ID),
zap.String("progress", progress),
)
continue
}
pollTiFlashContext.ProgressCache[tb.ID] = progress
}

logutil.BgLogger().Debug("CollectTiFlashStatus", zap.Any("regionReplica", regionReplica), zap.Int64("tableID", tb.ID))

var stats helper.PDRegionStats
if err := infosync.GetTiFlashPDRegionRecordStats(context.Background(), tb.ID, &stats); err != nil {
return allReplicaReady, err
}
regionCount := stats.Count
flashRegionCount := len(regionReplica)
avail := regionCount == flashRegionCount
avail := progress[0] == '1'
failpoint.Inject("PollTiFlashReplicaStatusReplaceCurAvailableValue", func(val failpoint.Value) {
avail = val.(bool)
})

if !avail {
logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.Uint64("region need", uint64(regionCount)), zap.Uint64("region have", uint64(flashRegionCount)))
logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.String("progress", progress))
pollTiFlashContext.Backoff.Put(tb.ID)
err := infosync.UpdateTiFlashTableSyncProgress(context.Background(), tb.ID, float64(flashRegionCount)/float64(regionCount))
if err != nil {
return false, err
}
} else {
logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.Uint64("region need", uint64(regionCount)))
logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.String("progress", progress))
pollTiFlashContext.Backoff.Remove(tb.ID)
err := infosync.DeleteTiFlashTableSyncProgress(tb.ID)
if err != nil {
return false, err
}
}
failpoint.Inject("skipUpdateTableReplicaInfoInLoop", func() {
failpoint.Continue()
Expand All @@ -457,10 +571,14 @@ func (d *ddl) pollTiFlashReplicaStatus(ctx sessionctx.Context, pollTiFlashContex
logutil.BgLogger().Error("updating TiFlash replica status err", zap.Error(err), zap.Int64("tableID", tb.ID), zap.Bool("isPartition", tb.IsPartition))
}
}
} else {
if needPushPending {
pollTiFlashContext.UpdatingProgressTables.PushFront(AvailableTableID{tb.ID, tb.IsPartition})
}
}
}

return allReplicaReady, nil
return nil
}

func getDropOrTruncateTableTiflash(ctx sessionctx.Context, currentSchema infoschema.InfoSchema, tikvHelper *helper.Helper, replicaInfos *[]TiFlashReplicaStatus) error {
Expand Down Expand Up @@ -583,10 +701,10 @@ func (d *ddl) PollTiFlashRoutine() {
}
if d.IsTiFlashPollEnabled() {
if d.sessPool == nil {
logutil.BgLogger().Error("failed to get sessionPool for pollTiFlashReplicaStatus")
logutil.BgLogger().Error("failed to get sessionPool for refreshTiFlashTicker")
return
}
failpoint.Inject("BeforePollTiFlashReplicaStatusLoop", func() {
failpoint.Inject("BeforeRefreshTiFlashTickeLoop", func() {
failpoint.Continue()
})

Expand All @@ -604,15 +722,17 @@ func (d *ddl) PollTiFlashRoutine() {
sctx, err := d.sessPool.get()
if err == nil {
if d.ownerManager.IsOwner() {
_, err := d.pollTiFlashReplicaStatus(sctx, pollTiflashContext)
err := d.refreshTiFlashTicker(sctx, pollTiflashContext)
if err != nil {
switch err.(type) {
case *infosync.MockTiFlashError:
// If we have not set up MockTiFlash instance, for those tests without TiFlash, just suppress.
default:
logutil.BgLogger().Warn("pollTiFlashReplicaStatus returns error", zap.Error(err))
logutil.BgLogger().Warn("refreshTiFlashTicker returns error", zap.Error(err))
}
}
} else {
pollTiflashContext.ProgressCache = make(map[int64]string)
}
d.sessPool.put(sctx)
} else {
Expand Down
Loading

0 comments on commit 319b320

Please sign in to comment.