Skip to content

Commit

Permalink
store: remove stores that have no region before balance (#52966)
Browse files Browse the repository at this point in the history
ref #52313
  • Loading branch information
xzhangxian1008 authored May 27, 2024
1 parent 0bbeb15 commit c391b52
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 29 deletions.
3 changes: 2 additions & 1 deletion pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 30,
deps = [
"//pkg/kv",
"//pkg/store/driver/backoff",
Expand All @@ -92,6 +92,7 @@ go_test(
"//pkg/util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
Expand Down
88 changes: 60 additions & 28 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,12 +820,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore
return
}

func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap))
for _, store := range allTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
if ok {
allUsedTiFlashStores = append(allUsedTiFlashStores, store)
}
}
return allUsedTiFlashStores
}

// getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs.
// If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone.
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
aliveStores = new(aliveStoresBundle)
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store)
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store)

if !tiflashReplicaReadPolicy.IsAllReplicas() {
aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
Expand All @@ -850,9 +862,8 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time
// 1. tiflash_replica_read policy
// 2. whether the store is alive
// After filtering, it will build the RegionInfo.
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, allStores []uint64, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
needCrossZoneAccess := false
allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy)
regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}
if needCrossZoneAccess {
Expand Down Expand Up @@ -896,10 +907,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
if !isTiDBLabelZoneSet {
tiflashReplicaReadPolicy = tiflash.AllReplicas
}
aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

for {
var tasks []*copTask
rangesLen = 0
Expand All @@ -920,12 +928,10 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks))
usedTiFlashStores := make([][]uint64, 0, len(tasks))
usedTiFlashStoresMap := make(map[uint64]struct{}, 0)
needRetry := false
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
if err != nil {
Expand All @@ -942,36 +948,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
// Then `splitRegion` will reloads these regions.
continue
}

allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
for _, storeID := range allStores {
usedTiFlashStoresMap[storeID] = struct{}{}
}
rpcCtxs = append(rpcCtxs, rpcCtx)
usedTiFlashStores = append(usedTiFlashStores, allStores)
}

if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone)
if tiflashReplicaReadPolicy.IsClosestReplicas() {
if len(aliveStores.storeIDsInTiDBZone) == 0 {
return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone)
}
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
}

var batchTasks []*batchCopTask
var regionIDsInOtherZones []uint64
var regionInfosNeedReloadOnSendFail []RegionInfo
storeTaskMap := make(map[string]*batchCopTask)
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
for idx, task := range tasks {
var err error
var regionInfo RegionInfo
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
if err != nil {
return nil, err
}
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
storeAddr: rpcCtxs[idx].Addr,
cmdType: cmdType,
ctx: rpcCtx,
ctx: rpcCtxs[idx],
regionInfos: []RegionInfo{regionInfo},
}
storeTaskMap[rpcCtx.Addr] = batchTask
storeTaskMap[rpcCtxs[idx].Addr] = batchTask
}
for _, storeID := range regionInfo.AllStores {
storeIDsUnionSetForAllTasks[storeID] = struct{}{}
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
// It's equal to long poll the store but get no response. Here we'd better use
// TiFlash error to trigger the TiKV fallback mechanism.
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}

if len(regionIDsInOtherZones) != 0 {
warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone)
regionIDErrMsg := ""
Expand Down
38 changes: 38 additions & 0 deletions pkg/store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/store/driver/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stathat/consistent"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) {
require.GreaterOrEqual(t, dura, 30*time.Second)
require.LessOrEqual(t, dura, 50*time.Second)
}

func TestGetAllUsedTiFlashStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}

cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 0, []*metapb.StoreLabel{&label1, &label2})

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
require.Equal(t, 3, len(allTiFlashStores))
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}

0 comments on commit c391b52

Please sign in to comment.