From b02af31e101f5ef4943780db9cf64215ee1819a4 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 27 May 2024 16:53:50 +0800 Subject: [PATCH] store: remove stores that have no region before balance (#52787) ref pingcap/tidb#52313 --- store/copr/batch_coprocessor.go | 34 +++++++++++++++++++------- store/copr/batch_coprocessor_test.go | 36 ++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 3384eccfadaee..85f2e983851f0 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -280,6 +280,19 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca return res, score } +func getUsedStores(cache *RegionCache, usedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store { + // decide the available stores + stores := cache.RegionCache.GetTiFlashStores() + usedStores := make([]*tikv.Store, 0) + for _, store := range stores { + _, ok := usedTiFlashStoresMap[store.StoreID()] + if ok { + usedStores = append(usedStores, store) + } + } + return usedStores +} + // balanceBatchCopTask balance the regions between available stores, the basic rule is // 1. the first region of each original batch cop task belongs to its original store because some // meta data(like the rpc context) in batchCopTask is related to it @@ -292,7 +305,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca // // The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely). // If balanceWithContinuity is true, the second balance strategy is enable. -func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { +func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, usedTiFlashStoresMap map[uint64]struct{}, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { if len(originalTasks) == 0 { log.Info("Batch cop task balancer got an empty task set.") return originalTasks @@ -321,16 +334,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] storeTaskMap[taskStoreID] = batchTask } } else { + usedStores := getUsedStores(cache, usedTiFlashStoresMap) logutil.BgLogger().Info("detecting available mpp stores") - // decide the available stores - stores := cache.RegionCache.GetTiFlashStores() var wg sync.WaitGroup var mu sync.Mutex - wg.Add(len(stores)) - for i := range stores { + wg.Add(len(usedStores)) + for i := range usedStores { go func(idx int) { defer wg.Done() - s := stores[idx] + s := usedStores[idx] // check if store is failed already. ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) @@ -548,6 +560,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach var batchTasks []*batchCopTask storeTaskMap := make(map[string]*batchCopTask) + usedTiFlashStoresMap := make(map[uint64]struct{}) needRetry := false for _, task := range tasks { rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP) @@ -577,6 +590,10 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach } storeTaskMap[rpcCtx.Addr] = batchTask } + + for _, store := range allStores { + usedTiFlashStoresMap[store] = struct{}{} + } } if needRetry { // As mentioned above, nil rpcCtx is always attributed to failed stores. @@ -600,7 +617,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach logutil.BgLogger().Debug(msg) } balanceStart := time.Now() - batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks = balanceBatchCopTask(bo.GetCtx(), store, usedTiFlashStoresMap, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) balanceElapsed := time.Since(balanceStart) if log.GetLevel() <= zap.DebugLevel { msg := "After region balance:" @@ -871,8 +888,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo Priority: priorityToPB(b.req.Priority), NotFillCache: b.req.NotFillCache, RecordTimeStat: true, - RecordScanStat: true, - TaskId: b.req.TaskID, + RecordScanStat: true, TaskId: b.req.TaskID, }) if b.req.ResourceGroupTagger != nil { b.req.ResourceGroupTagger(req) diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go index 49626a2df5506..cd1e46b47120c 100644 --- a/store/copr/batch_coprocessor_test.go +++ b/store/copr/batch_coprocessor_test.go @@ -23,7 +23,9 @@ import ( "github.com/pingcap/tidb/kv" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" ) // StoreID: [1, storeCount] @@ -119,13 +121,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) { func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) { { var nilTaskSet []*batchCopTask - nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0) + nilResult := balanceBatchCopTask(nil, nil, nil, nilTaskSet, false, time.Second, false, 0) require.True(t, nilResult == nil) } { emptyTaskSet := make([]*batchCopTask, 0) - emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0) + emptyResult := balanceBatchCopTask(nil, nil, nil, emptyTaskSet, false, time.Second, false, 0) require.True(t, emptyResult != nil) require.True(t, len(emptyResult) == 0) } @@ -150,3 +152,33 @@ func TestDeepCopyStoreTaskMap(t *testing.T) { require.Equal(t, 2, len(task.regionInfos)) } } + +func TestGetUsedStores(t *testing.T) { + mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + + pdCli := &tikv.CodecPDClient{Client: pdClient} + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + cache.SetRegionCacheStore(1, tikvrpc.TiFlash, 0, nil) + cache.SetRegionCacheStore(2, tikvrpc.TiFlash, 0, nil) + cache.SetRegionCacheStore(3, tikvrpc.TiFlash, 0, nil) + + allUsedTiFlashStoresMap := make(map[uint64]struct{}) + allUsedTiFlashStoresMap[2] = struct{}{} + allUsedTiFlashStoresMap[3] = struct{}{} + allUsedTiFlashStores := getUsedStores(cache, allUsedTiFlashStoresMap) + require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores)) + for _, store := range allUsedTiFlashStores { + _, ok := allUsedTiFlashStoresMap[store.StoreID()] + require.True(t, ok) + } +}