Skip to content

Commit

Permalink
store: remove stores that have no region before balance (#52787)
Browse files Browse the repository at this point in the history
ref #52313
  • Loading branch information
xzhangxian1008 authored May 27, 2024
1 parent e98aba5 commit b02af31
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
34 changes: 25 additions & 9 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,19 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
return res, score
}

func getUsedStores(cache *RegionCache, usedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
usedStores := make([]*tikv.Store, 0)
for _, store := range stores {
_, ok := usedTiFlashStoresMap[store.StoreID()]
if ok {
usedStores = append(usedStores, store)
}
}
return usedStores
}

// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
Expand All @@ -292,7 +305,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, usedTiFlashStoresMap map[uint64]struct{}, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
Expand Down Expand Up @@ -321,16 +334,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
usedStores := getUsedStores(cache, usedTiFlashStoresMap)
logutil.BgLogger().Info("detecting available mpp stores")
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
wg.Add(len(usedStores))
for i := range usedStores {
go func(idx int) {
defer wg.Done()
s := stores[idx]
s := usedStores[idx]

// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
Expand Down Expand Up @@ -548,6 +560,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
var batchTasks []*batchCopTask

storeTaskMap := make(map[string]*batchCopTask)
usedTiFlashStoresMap := make(map[uint64]struct{})
needRetry := false
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP)
Expand Down Expand Up @@ -577,6 +590,10 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
storeTaskMap[rpcCtx.Addr] = batchTask
}

for _, store := range allStores {
usedTiFlashStoresMap[store] = struct{}{}
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
Expand All @@ -600,7 +617,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
logutil.BgLogger().Debug(msg)
}
balanceStart := time.Now()
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, usedTiFlashStoresMap, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down Expand Up @@ -871,8 +888,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backo
Priority: priorityToPB(b.req.Priority),
NotFillCache: b.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
TaskId: b.req.TaskID,
RecordScanStat: true, TaskId: b.req.TaskID,
})
if b.req.ResourceGroupTagger != nil {
b.req.ResourceGroupTagger(req)
Expand Down
36 changes: 34 additions & 2 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)

// StoreID: [1, storeCount]
Expand Down Expand Up @@ -119,13 +121,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0)
nilResult := balanceBatchCopTask(nil, nil, nil, nilTaskSet, false, time.Second, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0)
emptyResult := balanceBatchCopTask(nil, nil, nil, emptyTaskSet, false, time.Second, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand All @@ -150,3 +152,33 @@ func TestDeepCopyStoreTaskMap(t *testing.T) {
require.Equal(t, 2, len(task.regionInfos))
}
}

func TestGetUsedStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := &tikv.CodecPDClient{Client: pdClient}
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

cache.SetRegionCacheStore(1, tikvrpc.TiFlash, 0, nil)
cache.SetRegionCacheStore(2, tikvrpc.TiFlash, 0, nil)
cache.SetRegionCacheStore(3, tikvrpc.TiFlash, 0, nil)

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allUsedTiFlashStores := getUsedStores(cache, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}

0 comments on commit b02af31

Please sign in to comment.