Skip to content

Commit

Permalink
br: handle region leader miss (#52822)
Browse files Browse the repository at this point in the history
close #50501, close #51124
  • Loading branch information
Leavrth authored Apr 24, 2024
1 parent 6eac861 commit 0805e85
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 8 deletions.
3 changes: 2 additions & 1 deletion br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,8 @@ func (importer *FileImporter) ingestSSTs(
) (*import_sstpb.IngestResponse, error) {
leader := regionInfo.Leader
if leader == nil {
leader = regionInfo.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", regionInfo.Region.Id)
}
reqCtx := &kvrpcpb.Context{
RegionId: regionInfo.Region.GetId(),
Expand Down
11 changes: 7 additions & 4 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func initTestClient(isRawKv bool) *TestClient {
}
regions[i] = &split.RegionInfo{
Leader: &metapb.Peer{
Id: i,
Id: i,
StoreId: 1,
},
Region: &metapb.Region{
Id: i,
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestEpochNotMatch(t *testing.T) {
{Id: 43},
},
},
Leader: &metapb.Peer{Id: 43},
Leader: &metapb.Peer{Id: 43, StoreId: 1},
}
newRegion := pdtypes.NewRegionInfo(info.Region, info.Leader)
mergeRegion := func() {
Expand Down Expand Up @@ -340,7 +341,8 @@ func TestRegionSplit(t *testing.T) {
EndKey: codec.EncodeBytes(nil, []byte("aayy")),
},
Leader: &metapb.Peer{
Id: 43,
Id: 43,
StoreId: 1,
},
},
{
Expand All @@ -350,7 +352,8 @@ func TestRegionSplit(t *testing.T) {
EndKey: target.Region.EndKey,
},
Leader: &metapb.Peer{
Id: 45,
Id: 45,
StoreId: 1,
},
},
}
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ func (c *MockPDClientForSplit) setRegions(boundaries [][]byte) []*metapb.Region
StartKey: boundaries[i-1],
EndKey: boundaries[i],
}
p := &metapb.Peer{
Id: c.lastRegionID,
StoreId: 1,
}
c.Regions.SetRegion(&pdtypes.Region{
Meta: r,
Meta: r,
Leader: p,
})
ret = append(ret, r)
}
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,23 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
}

cur := regions[0]
if cur.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", cur.Region.Id)
}
if cur.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", cur.Region.Id)
}
for _, r := range regions[1:] {
if r.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", r.Region.Id)
}
if r.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", r.Region.Id)
}
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s",
Expand Down
76 changes: 76 additions & 0 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,13 +504,21 @@ func TestPaginateScanRegion(t *testing.T) {
StartKey: []byte{1},
EndKey: []byte{2},
},
Leader: &metapb.Peer{
Id: 1,
StoreId: 1,
},
})
mockPDClient.Regions.SetRegion(&pdtypes.Region{
Meta: &metapb.Region{
Id: 4,
StartKey: []byte{4},
EndKey: []byte{5},
},
Leader: &metapb.Peer{
Id: 4,
StoreId: 1,
},
})

_, err = PaginateScanRegion(ctx, mockClient, []byte{1}, []byte{5}, 3)
Expand All @@ -525,13 +533,21 @@ func TestPaginateScanRegion(t *testing.T) {
StartKey: []byte{2},
EndKey: []byte{3},
},
Leader: &metapb.Peer{
Id: 2,
StoreId: 1,
},
},
{
Meta: &metapb.Region{
Id: 3,
StartKey: []byte{3},
EndKey: []byte{4},
},
Leader: &metapb.Peer{
Id: 3,
StoreId: 1,
},
},
}
mockPDClient.scanRegions.beforeHook = func() {
Expand Down Expand Up @@ -590,6 +606,10 @@ func TestRegionConsistency(t *testing.T) {
"region 6's endKey not equal to next region 8's startKey(.*?)",
[]*RegionInfo{
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 1,
},
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
Expand All @@ -598,6 +618,10 @@ func TestRegionConsistency(t *testing.T) {
},
},
{
Leader: &metapb.Peer{
Id: 8,
StoreId: 1,
},
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("e")),
Expand All @@ -606,6 +630,58 @@ func TestRegionConsistency(t *testing.T) {
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("c")),
codec.EncodeBytes([]byte{}, []byte("e")),
"region 6's leader is nil(.*?)",
[]*RegionInfo{
{
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("c")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
RegionEpoch: nil,
},
},
{
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("d")),
EndKey: codec.EncodeBytes([]byte{}, []byte("e")),
},
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("c")),
codec.EncodeBytes([]byte{}, []byte("e")),
"region 6's leader's store id is 0(.*?)",
[]*RegionInfo{
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 0,
},
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("c")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
RegionEpoch: nil,
},
},
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 0,
},
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("d")),
EndKey: codec.EncodeBytes([]byte{}, []byte("e")),
},
},
},
},
}
for _, ca := range cases {
err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions)
Expand Down
1 change: 1 addition & 0 deletions pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/checksum",
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/membuf",
"//br/pkg/pdutil",
Expand Down
4 changes: 3 additions & 1 deletion pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/pkg/distsql"
Expand Down Expand Up @@ -312,7 +313,8 @@ func getDupDetectClient(
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
leader = region.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", region.Region.Id)
}
importClient, err := importClientFactory.Create(ctx, leader.GetStoreId())
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -624,7 +625,8 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe

leader := j.region.Leader
if leader == nil {
leader = j.region.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", j.region.Region.Id)
}

cli, err := clientFactory.Create(ctx, leader.StoreId)
Expand Down

0 comments on commit 0805e85

Please sign in to comment.