Skip to content

Commit

Permalink
store/copr: add a param "limit" to region cache's SplitRegionRanges (
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Jan 29, 2023
1 parent 8e96350 commit f842cd9
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
8 changes: 4 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
// to speed up backfilling data in table with disperse handle.
// The `t` should be a non-partitioned table or a partition.
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key) ([]kv.KeyRange, error) {
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int) ([]kv.KeyRange, error) {
logutil.BgLogger().Info("[ddl] split table range from PD",
zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("start key", hex.EncodeToString(startKey)),
Expand All @@ -504,7 +504,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
maxSleep := 10000 // ms
bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil)
rc := copr.NewRegionCache(s.GetRegionCache())
ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange})
ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -981,7 +981,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
}

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTb
isFirstOps := true
bJobs := make([]*BackfillJob, 0, genTaskBatch)
for {
kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey)
kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey, genTaskBatch)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer,

for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -677,7 +677,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
rangesLen = 0
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
21 changes: 13 additions & 8 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,46 +381,51 @@ func TestSplitRegionRanges(t *testing.T) {

bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil)

ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"))
ranges, err := cache.SplitRegionRanges(bo, BuildKeyRanges("a", "c"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "a", "c")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("h", "y"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 3)
rangeEqual(t, ranges, "h", "n", "n", "t", "t", "y")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "z"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 2)
rangeEqual(t, ranges, "s", "t", "t", "z")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("s", "s"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "s", "s")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "t"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "t", "t")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("t", "u"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "t", "u")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("u", "z"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 1)
rangeEqual(t, ranges, "u", "z")

// min --> max
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"))
ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), UnspecifiedLimit)
require.NoError(t, err)
require.Len(t, ranges, 4)
rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t", "t", "z")

ranges, err = cache.SplitRegionRanges(bo, BuildKeyRanges("a", "z"), 3)
require.NoError(t, err)
require.Len(t, ranges, 3)
rangeEqual(t, ranges, "a", "g", "g", "n", "n", "t")
}

func TestRebuild(t *testing.T) {
Expand Down
14 changes: 10 additions & 4 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func NewRegionCache(rc *tikv.RegionCache) *RegionCache {
}

// SplitRegionRanges gets the split ranges from pd region.
func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange) ([]kv.KeyRange, error) {
func (c *RegionCache) SplitRegionRanges(bo *Backoffer, keyRanges []kv.KeyRange, limit int) ([]kv.KeyRange, error) {
ranges := NewKeyRanges(keyRanges)

locations, err := c.SplitKeyRangesByLocations(bo, ranges)
locations, err := c.SplitKeyRangesByLocations(bo, ranges, limit)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down Expand Up @@ -122,10 +122,16 @@ func (l *LocationKeyRanges) splitKeyRangesByBuckets() []*LocationKeyRanges {
return res
}

// UnspecifiedLimit means no limit.
const UnspecifiedLimit = -1

// SplitKeyRangesByLocations splits the KeyRanges by logical info in the cache.
func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
res := make([]*LocationKeyRanges, 0)
for ranges.Len() > 0 {
if limit != UnspecifiedLimit && len(res) >= limit {
break
}
loc, err := c.LocateKey(bo.TiKVBackoffer(), ranges.At(0).StartKey)
if err != nil {
return res, derr.ToTiDBErr(err)
Expand Down Expand Up @@ -176,7 +182,7 @@ func (c *RegionCache) SplitKeyRangesByLocations(bo *Backoffer, ranges *KeyRanges
//
// TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled.
func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
locs, err := c.SplitKeyRangesByLocations(bo, ranges)
locs, err := c.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down

0 comments on commit f842cd9

Please sign in to comment.