Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

lightning/backend: run batch split region in parallel for local backend #761

Merged
merged 14 commits into from
Mar 13, 2021
Merged
161 changes: 109 additions & 52 deletions pkg/lightning/backend/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ import (
"context"
"encoding/hex"
"regexp"
"runtime"
"sort"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
split "github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/utils"
Expand Down Expand Up @@ -69,6 +74,7 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []
log.ZapRedactBinary("maxKey", maxKey),
zap.Int("retry", i),
)
err = nil
if i > 0 {
select {
case <-time.After(waitTime):
Expand Down Expand Up @@ -123,54 +129,103 @@ func (local *local) SplitAndScatterRegionByRanges(ctx context.Context, ranges []
} else {
splitKeyMap = getSplitKeysByRanges(ranges, regions)
}
for regionID, keys := range splitKeyMap {
var newRegions []*split.RegionInfo
region := regionMap[regionID]
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
splitRegion := region
for j := 0; j < (len(keys)+maxBatchSplitKeys-1)/maxBatchSplitKeys; j++ {
start := j * maxBatchSplitKeys
end := utils.MinInt((j+1)*maxBatchSplitKeys, len(keys))
splitRegionStart := codec.EncodeBytes([]byte{}, keys[start])
splitRegionEnd := codec.EncodeBytes([]byte{}, keys[end-1])
if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) < 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) {
log.L().Fatal("no valid key in region",
log.ZapRedactBinary("startKey", splitRegionStart), log.ZapRedactBinary("endKey", splitRegionEnd),
log.ZapRedactBinary("regionStart", splitRegion.Region.StartKey), log.ZapRedactBinary("regionEnd", splitRegion.Region.EndKey),
log.ZapRedactReflect("region", splitRegion))
}
splitRegion, newRegions, err = local.BatchSplitRegions(ctx, splitRegion, keys[start:end])
if err != nil {
if strings.Contains(err.Error(), "no valid key") {
for _, key := range keys {
log.L().Warn("no valid key",
log.ZapRedactBinary("startKey", region.Region.StartKey),
log.ZapRedactBinary("endKey", region.Region.EndKey),
log.ZapRedactBinary("key", codec.EncodeBytes([]byte{}, key)))
}
return errors.Trace(err)
}
log.L().Warn("split regions", log.ShortError(err), zap.Int("retry time", j+1),
zap.Uint64("region_id", regionID))
retryKeys = append(retryKeys, keys[start:]...)
break
} else {
log.L().Info("batch split region", zap.Uint64("region_id", splitRegion.Region.Id),
zap.Int("keys", end-start), zap.Binary("firstKey", keys[start]),
zap.Binary("end", keys[end-1]))
sort.Slice(newRegions, func(i, j int) bool {
return bytes.Compare(newRegions[i].Region.StartKey, newRegions[j].Region.StartKey) < 0

type splitInfo struct {
region *split.RegionInfo
keys [][]byte
}

var syncLock sync.Mutex
// TODO, make this size configurable
size := utils.MinInt(len(splitKeyMap), runtime.GOMAXPROCS(0))
ch := make(chan *splitInfo, size)
eg, splitCtx := errgroup.WithContext(ctx)

for splitWorker := 0; splitWorker < size; splitWorker++ {
eg.Go(func() error {
for sp := range ch {
var newRegions []*split.RegionInfo
var err1 error
region := sp.region
keys := sp.keys
sort.Slice(keys, func(i, j int) bool {
return bytes.Compare(keys[i], keys[j]) < 0
})
scatterRegions = append(scatterRegions, newRegions...)
// the region with the max start key is the region need to be further split.
if bytes.Compare(splitRegion.Region.StartKey, newRegions[len(newRegions)-1].Region.StartKey) < 0 {
splitRegion = newRegions[len(newRegions)-1]
splitRegion := region
for j := 0; j < (len(keys)+maxBatchSplitKeys-1)/maxBatchSplitKeys; j++ {
start := j * maxBatchSplitKeys
end := utils.MinInt((j+1)*maxBatchSplitKeys, len(keys))
splitRegionStart := codec.EncodeBytes([]byte{}, keys[start])
splitRegionEnd := codec.EncodeBytes([]byte{}, keys[end-1])
if bytes.Compare(splitRegionStart, splitRegion.Region.StartKey) < 0 || !beforeEnd(splitRegionEnd, splitRegion.Region.EndKey) {
log.L().Fatal("no valid key in region",
log.ZapRedactBinary("startKey", splitRegionStart), log.ZapRedactBinary("endKey", splitRegionEnd),
log.ZapRedactBinary("regionStart", splitRegion.Region.StartKey), log.ZapRedactBinary("regionEnd", splitRegion.Region.EndKey),
log.ZapRedactReflect("region", splitRegion))
}
splitRegion, newRegions, err1 = local.BatchSplitRegions(splitCtx, splitRegion, keys[start:end])
if err1 != nil {
if strings.Contains(err1.Error(), "no valid key") {
for _, key := range keys {
log.L().Warn("no valid key",
log.ZapRedactBinary("startKey", region.Region.StartKey),
log.ZapRedactBinary("endKey", region.Region.EndKey),
log.ZapRedactBinary("key", codec.EncodeBytes([]byte{}, key)))
}
return err1
} else if common.IsContextCanceledError(err1) {
// do not retry on conext.Canceled error
return err1
}
log.L().Warn("split regions", log.ShortError(err1), zap.Int("retry time", j+1),
zap.Uint64("region_id", region.Region.Id))

syncLock.Lock()
retryKeys = append(retryKeys, keys[start:]...)
// set global error so if we exceed retry limit, the function will return this error
if !common.IsContextCanceledError(err1) {
err = multierr.Append(err, err1)
}
syncLock.Unlock()
break
} else {
log.L().Info("batch split region", zap.Uint64("region_id", splitRegion.Region.Id),
zap.Int("keys", end-start), zap.Binary("firstKey", keys[start]),
zap.Binary("end", keys[end-1]))
sort.Slice(newRegions, func(i, j int) bool {
return bytes.Compare(newRegions[i].Region.StartKey, newRegions[j].Region.StartKey) < 0
})
syncLock.Lock()
scatterRegions = append(scatterRegions, newRegions...)
syncLock.Unlock()
// the region with the max start key is the region need to be further split.
if bytes.Compare(splitRegion.Region.StartKey, newRegions[len(newRegions)-1].Region.StartKey) < 0 {
splitRegion = newRegions[len(newRegions)-1]
}
}
}
}
return nil
})
}
sendLoop:
for regionID, keys := range splitKeyMap {
select {
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
case <-ctx.Done():
// outer context is canceled, can directly return
close(ch)
return ctx.Err()
case <-splitCtx.Done():
// met critical error, stop process
break sendLoop
}
}
close(ch)
if splitError := eg.Wait(); splitError != nil {
return splitError
}

if len(retryKeys) == 0 {
break
} else {
Expand Down Expand Up @@ -234,6 +289,9 @@ func paginateScanRegion(
break
}
}
sort.Slice(regions, func(i, j int) bool {
return bytes.Compare(regions[i].Region.StartKey, regions[j].Region.StartKey) < 0
})
return regions, nil
}

Expand Down Expand Up @@ -384,20 +442,19 @@ func needSplit(key []byte, regions []*split.RegionInfo) *split.RegionInfo {
}
splitKey := codec.EncodeBytes([]byte{}, key)

for _, region := range regions {
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
return nil
}
idx := sort.Search(len(regions), func(i int) bool {
return beforeEnd(splitKey, regions[i].Region.EndKey)
})
if idx < len(regions) {
// If splitKey is in a region
if bytes.Compare(splitKey, region.Region.GetStartKey()) > 0 && beforeEnd(splitKey, region.Region.GetEndKey()) {
if bytes.Compare(splitKey, regions[idx].Region.GetStartKey()) > 0 && beforeEnd(splitKey, regions[idx].Region.GetEndKey()) {
log.L().Debug("need split",
zap.Binary("splitKey", key),
zap.Binary("encodedKey", splitKey),
zap.Binary("region start", region.Region.GetStartKey()),
zap.Binary("region end", region.Region.GetEndKey()),
zap.Binary("region start", regions[idx].Region.GetStartKey()),
zap.Binary("region end", regions[idx].Region.GetEndKey()),
)
return region
return regions[idx]
}
}
return nil
Expand Down
Loading