Skip to content

Commit

Permalink
store: remove stores that have no region before balance (#52314) (#53573
Browse files Browse the repository at this point in the history
)

close #52313
  • Loading branch information
ti-chi-bot authored May 28, 2024
1 parent ffc8ca3 commit 30c2ca0
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 35 deletions.
3 changes: 2 additions & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 30,
deps = [
"//pkg/kv",
"//pkg/store/driver/backoff",
Expand All @@ -92,6 +92,7 @@ go_test(
"//pkg/util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
Expand Down
117 changes: 85 additions & 32 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
func balanceBatchCopTask(aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
Expand Down Expand Up @@ -819,12 +819,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore
return
}

func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap))
for _, store := range allTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
if ok {
allUsedTiFlashStores = append(allUsedTiFlashStores, store)
}
}
return allUsedTiFlashStores
}

// getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs.
// If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone.
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
aliveStores = new(aliveStoresBundle)
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store)
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store)

if !tiflashReplicaReadPolicy.IsAllReplicas() {
aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
Expand All @@ -849,11 +861,28 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time
// 1. tiflash_replica_read policy
// 2. whether the store is alive
// After filtering, it will build the RegionInfo.
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
func filterAccessibleStoresAndBuildRegionInfo(
cache *RegionCache,
allStores []uint64,
bo *Backoffer,
task *copTask,
rpcCtx *tikv.RPCContext,
aliveStores *aliveStoresBundle,
tiflashReplicaReadPolicy tiflash.ReplicaRead,
regionInfoNeedsReloadOnSendFail []RegionInfo,
regionsInOtherZones []uint64,
maxRemoteReadCountAllowed int,
tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
needCrossZoneAccess := false
allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy)
regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}

regionInfo = RegionInfo{
Region: task.region,
Meta: rpcCtx.Meta,
Ranges: task.ranges,
AllStores: allStores,
PartitionIndex: task.partitionIndex}

if needCrossZoneAccess {
regionsInOtherZones = append(regionsInOtherZones, task.region.GetID())
regionInfoNeedsReloadOnSendFail = append(regionInfoNeedsReloadOnSendFail, regionInfo)
Expand All @@ -862,7 +891,9 @@ func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer,
for i := 0; i < 3 && i < len(regionsInOtherZones); i++ {
regionIDErrMsg += fmt.Sprintf("%d, ", regionsInOtherZones[i])
}
err = errors.Errorf("no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc", len(regionsInOtherZones), tidbZone, regionIDErrMsg)
err = errors.Errorf(
"no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc",
len(regionsInOtherZones), tidbZone, regionIDErrMsg)
// We need to reload the region cache here to avoid the failure throughout the region cache refresh TTL.
cache.OnSendFailForBatchRegions(bo, rpcCtx.Store, regionInfoNeedsReloadOnSendFail, true, err)
return regionInfo, nil, nil, err
Expand Down Expand Up @@ -895,10 +926,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
if !isTiDBLabelZoneSet {
tiflashReplicaReadPolicy = tiflash.AllReplicas
}
aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

for {
var tasks []*copTask
rangesLen = 0
Expand All @@ -919,17 +947,16 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks))
usedTiFlashStores := make([][]uint64, 0, len(tasks))
usedTiFlashStoresMap := make(map[uint64]struct{}, 0)
needRetry := false
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
if err != nil {
return nil, errors.Trace(err)
}

// When rpcCtx is nil, it's not only attributed to the miss region, but also
// some TiFlash stores crash and can't be recovered.
// That is not an error that can be easily recovered, so we regard this error
Expand All @@ -941,36 +968,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
// Then `splitRegion` will reloads these regions.
continue
}

allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
for _, storeID := range allStores {
usedTiFlashStoresMap[storeID] = struct{}{}
}
rpcCtxs = append(rpcCtxs, rpcCtx)
usedTiFlashStores = append(usedTiFlashStores, allStores)
}

if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
if len(aliveStores.storeIDsInTiDBZone) == 0 {
return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone)
}
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for idx, task := range tasks {
var err error
var regionInfo RegionInfo
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
if err != nil {
return nil, err
}
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
storeAddr: rpcCtxs[idx].Addr,
cmdType: cmdType,
ctx: rpcCtx,
ctx: rpcCtxs[idx],
regionInfos: []RegionInfo{regionInfo},
}
storeTaskMap[rpcCtx.Addr] = batchTask
storeTaskMap[rpcCtxs[idx].Addr] = batchTask
}
for _, storeID := range regionInfo.AllStores {
storeIDsUnionSetForAllTasks[storeID] = struct{}{}
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

if len(regionIDsInOtherZones) != 0 {
warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone)
regionIDErrMsg := ""
Expand Down Expand Up @@ -998,7 +1051,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
storesUnionSetForAllTasks = append(storesUnionSetForAllTasks, store)
}
}
batchTasks = balanceBatchCopTask(bo.GetCtx(), storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks = balanceBatchCopTask(storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down
42 changes: 40 additions & 2 deletions pkg/store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/store/driver/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stathat/consistent"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -125,13 +128,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, 0)
nilResult := balanceBatchCopTask(nil, nilTaskSet, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, 0)
emptyResult := balanceBatchCopTask(nil, emptyTaskSet, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand Down Expand Up @@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) {
require.GreaterOrEqual(t, dura, 30*time.Second)
require.LessOrEqual(t, dura, 50*time.Second)
}

func TestGetAllUsedTiFlashStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}

cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
require.Equal(t, 3, len(allTiFlashStores))
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}

0 comments on commit 30c2ca0

Please sign in to comment.