Skip to content

Commit

Permalink
local backend: fix worker err overriden by job generation err (#48185)
Browse files Browse the repository at this point in the history
close #47992
  • Loading branch information
D3Hunter authored Nov 1, 2023
1 parent 1ba524c commit c652a92
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,9 @@ func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by
LowerBound: lowerBound,
UpperBound: upperBound,
}
failpoint.Inject("mockGetFirstAndLastKey", func() {
failpoint.Return(lowerBound, upperBound, nil)
})

iter := e.newKVIter(context.Background(), opt)
//nolint: errcheck
Expand Down
47 changes: 27 additions & 20 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,12 @@ func (local *Backend) generateAndSendJob(
}

failpoint.Inject("beforeGenerateJob", nil)
failpoint.Inject("sendDummyJob", func(_ failpoint.Value) {
// this is used to trigger worker failure, used together
// with WriteToTiKVNotEnoughDiskSpace
jobToWorkerCh <- &regionJob{}
time.Sleep(5 * time.Second)
})
jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys)
if err != nil {
if common.IsContextCanceledError(err) {
Expand Down Expand Up @@ -1679,29 +1685,30 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region

failpoint.Label("afterStartWorker")

err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
firstErr.Set(err)
workGroup.Go(func() error {
err := local.prepareAndSendJob(
workerCtx,
engine,
regionRanges,
regionSplitSize,
regionSplitKeys,
jobToWorkerCh,
&jobWg,
)
if err != nil {
return err
}

jobWg.Wait()
workerCancel()
err2 := workGroup.Wait()
if !common.IsContextCanceledError(err2) {
log.FromContext(ctx).Error("worker meets error", zap.Error(err2))
return nil
})
if err := workGroup.Wait(); err != nil {
if !common.IsContextCanceledError(err) {
log.FromContext(ctx).Error("do import meets error", zap.Error(err))
}
return firstErr.Get()
firstErr.Set(err)
}

jobWg.Wait()
workerCancel()
firstErr.Set(workGroup.Wait())
firstErr.Set(ctx.Err())
return firstErr.Get()
}

Expand Down
37 changes: 37 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2226,6 +2226,43 @@ func TestCtxCancelIsIgnored(t *testing.T) {
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

func TestWorkerFailedWhenGeneratingJobs(t *testing.T) {
backup := maxRetryBackoffSecond
maxRetryBackoffSecond = 1
t.Cleanup(func() {
maxRetryBackoffSecond = backup
})

_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()")
t.Cleanup(func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey")
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace")
})

initRanges := []common.Range{
{Start: []byte{'c'}, End: []byte{'d'}},
}

ctx := context.Background()
l := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 1,
},
splitCli: initTestSplitClient(
[][]byte{{1}, {11}},
panicSplitRegionClient{},
),
}
e := &Engine{}
err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
require.ErrorContains(t, err, "the remaining storage capacity of TiKV")
}

func TestExternalEngine(t *testing.T) {
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()")
_ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()")
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd
func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()

if err := ctx.Err(); err != nil {
return nil, err
}

if c.hook != nil {
key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit)
}
Expand Down

0 comments on commit c652a92

Please sign in to comment.