Skip to content

Commit

Permalink
lightning: new regionJob from needRescan has zero retry counter (#43810)
Browse files Browse the repository at this point in the history
close #43682
  • Loading branch information
lance6716 authored May 17, 2023
1 parent c0d2446 commit 87f7c85
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ func (local *Backend) generateAndSendJob(
return eg.Wait()
}

// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey).
// fakeRegionJobs is used in test, the injected job can be found by (startKey, endKey).
var fakeRegionJobs map[[2]string]struct {
jobs []*regionJob
err error
Expand Down Expand Up @@ -1280,7 +1280,7 @@ func (local *Backend) startWorker(
// 1 "needRescan" job becomes len(jobs) "regionScanned" jobs.
jobWg.Add(len(jobs) - 1)
for _, j := range jobs {
j.retryCount = job.retryCount
j.lastRetryableErr = job.lastRetryableErr
jobOutCh <- j
}
}
Expand Down
66 changes: 66 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1962,3 +1962,69 @@ func TestDoImport(t *testing.T) {
}
}
}

func TestRegionJobResetRetryCounter(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/fakeRegionJobs")
})

// test that job need rescan when ingest

initRanges := []Range{
{start: []byte{'c'}, end: []byte{'d'}},
}
fakeRegionJobs = map[[2]string]struct {
jobs []*regionJob
err error
}{
{"c", "d"}: {
jobs: []*regionJob{
{
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
engine: &Engine{},
injected: getNeedRescanWhenIngestBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
},
{
keyRange: Range{start: []byte{'c', '2'}, end: []byte{'d'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
retryCount: maxWriteAndIngestRetryTimes,
},
},
},
{"c", "c2"}: {
jobs: []*regionJob{
{
keyRange: Range{start: []byte{'c'}, end: []byte{'c', '2'}},
engine: &Engine{},
injected: getSuccessInjectedBehaviour(),
},
},
},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
},
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.NoError(t, err)
for _, v := range fakeRegionJobs {
for _, job := range v.jobs {
require.Len(t, job.injected, 0)
}
}
}
8 changes: 4 additions & 4 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func TestSplitPoint(t *testing.T) {
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: split.Value{Size: 100, Number: 100}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: split.Value{Size: 200, Number: 200}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}})
client := NewFakeSplitClient()
client := newFakeSplitClient()
client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f"))
client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h"))
client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j"))
Expand Down Expand Up @@ -828,7 +828,7 @@ func TestSplitPoint2(t *testing.T) {
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: split.Value{Size: 300, Number: 300}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: split.Value{Size: 200, Number: 200}})
splitHelper.Merge(split.Valued{Key: split.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: split.Value{Size: 200, Number: 200}})
client := NewFakeSplitClient()
client := newFakeSplitClient()
client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g"))
client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0)))
for i := 0; i < 256; i++ {
Expand Down Expand Up @@ -879,7 +879,7 @@ type fakeSplitClient struct {
regions []*split.RegionInfo
}

func NewFakeSplitClient() *fakeSplitClient {
func newFakeSplitClient() *fakeSplitClient {
return &fakeSplitClient{
regions: make([]*split.RegionInfo, 0),
}
Expand Down Expand Up @@ -1012,7 +1012,7 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) {
}
mockIter := &mockLogIter{}
ctx := context.Background()
logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, NewFakeSplitClient(), 144*1024*1024, 1440000)
logIter := restore.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, newFakeSplitClient(), 144*1024*1024, 1440000)
next := 0
for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) {
require.NoError(t, r.Err)
Expand Down

0 comments on commit 87f7c85

Please sign in to comment.