From 30c2ca084c0d6e53d0af526fe9a693cbb7aad0e7 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 28 May 2024 11:01:20 +0800 Subject: [PATCH] store: remove stores that have no region before balance (#52314) (#53573) close pingcap/tidb#52313 --- pkg/store/copr/BUILD.bazel | 3 +- pkg/store/copr/batch_coprocessor.go | 117 ++++++++++++++++------- pkg/store/copr/batch_coprocessor_test.go | 42 +++++++- 3 files changed, 127 insertions(+), 35 deletions(-) diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index e342644c14a12..69a0b376f8709 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -82,7 +82,7 @@ go_test( embed = [":copr"], flaky = True, race = "on", - shard_count = 29, + shard_count = 30, deps = [ "//pkg/kv", "//pkg/store/driver/backoff", @@ -92,6 +92,7 @@ go_test( "//pkg/util/trxevents", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_stathat_consistent//:consistent", "@com_github_stretchr_testify//require", diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 752be5f16d9e4..d7151ade6371a 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -304,7 +304,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca // // The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely). // If balanceWithContinuity is true, the second balance strategy is enable. -func balanceBatchCopTask(ctx context.Context, aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { +func balanceBatchCopTask(aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { if len(originalTasks) == 0 { log.Info("Batch cop task balancer got an empty task set.") return originalTasks @@ -819,12 +819,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore return } +func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store { + allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap)) + for _, store := range allTiFlashStores { + _, ok := allUsedTiFlashStoresMap[store.StoreID()] + if ok { + allUsedTiFlashStores = append(allUsedTiFlashStores, store) + } + } + return allUsedTiFlashStores +} + // getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs. // If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone. -func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) { +func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) { aliveStores = new(aliveStoresBundle) allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode) - aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store) + allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap) + aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store) if !tiflashReplicaReadPolicy.IsAllReplicas() { aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones)) @@ -849,11 +861,28 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time // 1. tiflash_replica_read policy // 2. whether the store is alive // After filtering, it will build the RegionInfo. -func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) { +func filterAccessibleStoresAndBuildRegionInfo( + cache *RegionCache, + allStores []uint64, + bo *Backoffer, + task *copTask, + rpcCtx *tikv.RPCContext, + aliveStores *aliveStoresBundle, + tiflashReplicaReadPolicy tiflash.ReplicaRead, + regionInfoNeedsReloadOnSendFail []RegionInfo, + regionsInOtherZones []uint64, + maxRemoteReadCountAllowed int, + tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) { needCrossZoneAccess := false - allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode) allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy) - regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex} + + regionInfo = RegionInfo{ + Region: task.region, + Meta: rpcCtx.Meta, + Ranges: task.ranges, + AllStores: allStores, + PartitionIndex: task.partitionIndex} + if needCrossZoneAccess { regionsInOtherZones = append(regionsInOtherZones, task.region.GetID()) regionInfoNeedsReloadOnSendFail = append(regionInfoNeedsReloadOnSendFail, regionInfo) @@ -862,7 +891,9 @@ func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, for i := 0; i < 3 && i < len(regionsInOtherZones); i++ { regionIDErrMsg += fmt.Sprintf("%d, ", regionsInOtherZones[i]) } - err = errors.Errorf("no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc", len(regionsInOtherZones), tidbZone, regionIDErrMsg) + err = errors.Errorf( + "no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc", + len(regionsInOtherZones), tidbZone, regionIDErrMsg) // We need to reload the region cache here to avoid the failure throughout the region cache refresh TTL. cache.OnSendFailForBatchRegions(bo, rpcCtx.Store, regionInfoNeedsReloadOnSendFail, true, err) return regionInfo, nil, nil, err @@ -895,10 +926,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach if !isTiDBLabelZoneSet { tiflashReplicaReadPolicy = tiflash.AllReplicas } - aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone) - if tiflashReplicaReadPolicy.IsClosestReplicas() { - maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas - } + for { var tasks []*copTask rangesLen = 0 @@ -919,17 +947,16 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach } } - var batchTasks []*batchCopTask - var regionIDsInOtherZones []uint64 - var regionInfosNeedReloadOnSendFail []RegionInfo - storeTaskMap := make(map[string]*batchCopTask) + rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks)) + usedTiFlashStores := make([][]uint64, 0, len(tasks)) + usedTiFlashStoresMap := make(map[uint64]struct{}, 0) needRetry := false - storeIDsUnionSetForAllTasks := make(map[uint64]struct{}) for _, task := range tasks { rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode) if err != nil { return nil, errors.Trace(err) } + // When rpcCtx is nil, it's not only attributed to the miss region, but also // some TiFlash stores crash and can't be recovered. // That is not an error that can be easily recovered, so we regard this error @@ -941,36 +968,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach // Then `splitRegion` will reloads these regions. continue } + + allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode) + for _, storeID := range allStores { + usedTiFlashStoresMap[storeID] = struct{}{} + } + rpcCtxs = append(rpcCtxs, rpcCtx) + usedTiFlashStores = append(usedTiFlashStores, allStores) + } + + if needRetry { + // As mentioned above, nil rpcCtx is always attributed to failed stores. + // It's equal to long poll the store but get no response. Here we'd better use + // TiFlash error to trigger the TiKV fallback mechanism. + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) + } + continue + } + + aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone) + if tiflashReplicaReadPolicy.IsClosestReplicas() { + if len(aliveStores.storeIDsInTiDBZone) == 0 { + return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone) + } + maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas + } + + var batchTasks []*batchCopTask + var regionIDsInOtherZones []uint64 + var regionInfosNeedReloadOnSendFail []RegionInfo + storeTaskMap := make(map[string]*batchCopTask) + storeIDsUnionSetForAllTasks := make(map[uint64]struct{}) + for idx, task := range tasks { + var err error var regionInfo RegionInfo - regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone) + regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone) if err != nil { return nil, err } - if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { + if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok { batchCop.regionInfos = append(batchCop.regionInfos, regionInfo) } else { batchTask := &batchCopTask{ - storeAddr: rpcCtx.Addr, + storeAddr: rpcCtxs[idx].Addr, cmdType: cmdType, - ctx: rpcCtx, + ctx: rpcCtxs[idx], regionInfos: []RegionInfo{regionInfo}, } - storeTaskMap[rpcCtx.Addr] = batchTask + storeTaskMap[rpcCtxs[idx].Addr] = batchTask } for _, storeID := range regionInfo.AllStores { storeIDsUnionSetForAllTasks[storeID] = struct{}{} } } - if needRetry { - // As mentioned above, nil rpcCtx is always attributed to failed stores. - // It's equal to long poll the store but get no response. Here we'd better use - // TiFlash error to trigger the TiKV fallback mechanism. - err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) - if err != nil { - return nil, errors.Trace(err) - } - continue - } + if len(regionIDsInOtherZones) != 0 { warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone) regionIDErrMsg := "" @@ -998,7 +1051,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach storesUnionSetForAllTasks = append(storesUnionSetForAllTasks, store) } } - batchTasks = balanceBatchCopTask(bo.GetCtx(), storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks = balanceBatchCopTask(storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount) balanceElapsed := time.Since(balanceStart) if log.GetLevel() <= zap.DebugLevel { msg := "After region balance:" diff --git a/pkg/store/copr/batch_coprocessor_test.go b/pkg/store/copr/batch_coprocessor_test.go index e94d2c17effe9..02c4122903a58 100644 --- a/pkg/store/copr/batch_coprocessor_test.go +++ b/pkg/store/copr/batch_coprocessor_test.go @@ -23,12 +23,15 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/driver/backoff" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stathat/consistent" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" "go.uber.org/zap" ) @@ -125,13 +128,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) { func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) { { var nilTaskSet []*batchCopTask - nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, 0) + nilResult := balanceBatchCopTask(nil, nilTaskSet, false, 0) require.True(t, nilResult == nil) } { emptyTaskSet := make([]*batchCopTask, 0) - emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, 0) + emptyResult := balanceBatchCopTask(nil, emptyTaskSet, false, 0) require.True(t, emptyResult != nil) require.True(t, len(emptyResult) == 0) } @@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) { require.GreaterOrEqual(t, dura, 30*time.Second) require.LessOrEqual(t, dura, 50*time.Second) } + +func TestGetAllUsedTiFlashStores(t *testing.T) { + mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash} + label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute} + + cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2}) + cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2}) + cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2}) + + allUsedTiFlashStoresMap := make(map[uint64]struct{}) + allUsedTiFlashStoresMap[2] = struct{}{} + allUsedTiFlashStoresMap[3] = struct{}{} + allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode) + require.Equal(t, 3, len(allTiFlashStores)) + allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap) + require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores)) + for _, store := range allUsedTiFlashStores { + _, ok := allUsedTiFlashStoresMap[store.StoreID()] + require.True(t, ok) + } +}