Skip to content

Commit

Permalink
cherry pick pingcap#32612 to release-5.3
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
pingyu authored and ti-srebot committed Mar 3, 2022
1 parent 459917c commit 0e8d8d1
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 67 deletions.
18 changes: 13 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -550,10 +550,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -584,6 +586,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -634,7 +638,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -779,9 +783,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -796,8 +802,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if cfg.Checkpoint.Enable {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewRestoreClient(
store kv.Storage,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
isRawKv bool,
) (*Client, error) {
db, err := NewDB(g, store)
if err != nil {
Expand All @@ -120,7 +121,7 @@ func NewRestoreClient(

return &Client{
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf),
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
db: db,
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
Expand Down Expand Up @@ -205,7 +206,7 @@ func (rc *Client) InitBackupMeta(
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit)
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{
Timeout: 10 * time.Second,
}

<<<<<<< HEAD
type testRestoreClientSuite struct {
mock *mock.Cluster
}
Expand All @@ -43,6 +44,12 @@ func (s *testRestoreClientSuite) SetUpTest(c *C) {
func (s *testRestoreClientSuite) TearDownTest(c *C) {
testleak.AfterTest(c)()
}
=======
func TestCreateTables(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)
>>>>>>> 4e69c0705... br: Fix backup rawkv failure (#32612)

func (s *testRestoreClientSuite) TestCreateTables(c *C) {
c.Assert(s.mock.Start(), IsNil)
Expand Down Expand Up @@ -101,21 +108,35 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {
}
}

<<<<<<< HEAD
func (s *testRestoreClientSuite) TestIsOnline(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()

client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg)
c.Assert(err, IsNil)
=======
func TestIsOnline(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)
>>>>>>> 4e69c0705... br: Fix backup rawkv failure (#32612)

c.Assert(client.IsOnline(), IsFalse)
client.EnableOnline()
c.Assert(client.IsOnline(), IsTrue)
}

<<<<<<< HEAD
func (s *testRestoreClientSuite) TestPreCheckTableClusterIndex(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()
=======
func TestPreCheckTableClusterIndex(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)
>>>>>>> 4e69c0705... br: Fix backup rawkv failure (#32612)

client, err := restore.NewRestoreClient(gluetidb.New(), s.mock.PDClient, s.mock.Storage, nil, defaultKeepaliveCfg)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -211,8 +232,13 @@ func (s *testRestoreClientSuite) TestPreCheckTableTiFlashReplicas(c *C) {

client, err := restore.NewRestoreClient(gluetidb.New(), fakePDClient{
stores: mockStores,
<<<<<<< HEAD
}, s.mock.Storage, nil, defaultKeepaliveCfg)
c.Assert(err, IsNil)
=======
}, m.Storage, nil, defaultKeepaliveCfg, false)
require.NoError(t, err)
>>>>>>> 4e69c0705... br: Fix backup rawkv failure (#32612)

tables := make([]*metautil.Table, 4)
for i := 0; i < len(tables); i++ {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh, false)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
Expand Down
15 changes: 11 additions & 4 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
isRawKv bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -106,7 +107,7 @@ SplitRegions:
}
return errors.Trace(errScan)
}
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions)
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions, isRawKv)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -451,14 +452,14 @@ func (b *scanRegionBackoffer) Attempt() int {

// getSplitKeys checks if the regions should be split by the end key of
// the ranges, groups the split keys by region id.
func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte {
func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo, isRawKv bool) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
checkKeys := make([][]byte, 0)
for _, rg := range ranges {
checkKeys = append(checkKeys, rg.EndKey)
}
for _, key := range checkKeys {
if region := NeedSplit(key, regions); region != nil {
if region := NeedSplit(key, regions, isRawKv); region != nil {
splitKeys, ok := splitKeyMap[region.Region.GetId()]
if !ok {
splitKeys = make([][]byte, 0, 1)
Expand All @@ -474,12 +475,18 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*R
}

// NeedSplit checks whether a key is necessary to split, if true returns the split region.
func NeedSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo {
func NeedSplit(splitKey []byte, regions []*RegionInfo, isRawKv bool) *RegionInfo {
// If splitKey is the max key.
if len(splitKey) == 0 {
return nil
}
<<<<<<< HEAD
splitKey = codec.EncodeBytes(splitKey)
=======
if !isRawKv {
splitKey = codec.EncodeBytes(nil, splitKey)
}
>>>>>>> 4e69c0705... br: Fix backup rawkv failure (#32612)
for _, region := range regions {
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,17 @@ type pdClient struct {
// this may mislead the scatter.
needScatterVal bool
needScatterInit sync.Once

isRawKv bool
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client, tlsConf *tls.Config) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
cli := &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
}
return cli
}
Expand Down Expand Up @@ -237,6 +240,7 @@ func splitRegionWithFailpoint(
peer *metapb.Peer,
client tikvpb.TikvClient,
keys [][]byte,
isRawKv bool,
) (*kvrpcpb.SplitRegionResponse, error) {
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
log.Debug("failpoint not-leader-error injected.")
Expand Down Expand Up @@ -267,6 +271,7 @@ func splitRegionWithFailpoint(
Peer: peer,
},
SplitKeys: keys,
IsRawKv: isRawKv,
})
}

Expand Down Expand Up @@ -302,7 +307,7 @@ func (c *pdClient) sendSplitRegionRequest(
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down
Loading

0 comments on commit 0e8d8d1

Please sign in to comment.