diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 399939fcd8e15..7dc5b083ab6ce 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -952,6 +952,9 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by LowerBound: lowerBound, UpperBound: upperBound, } + failpoint.Inject("mockGetFirstAndLastKey", func() { + failpoint.Return(lowerBound, upperBound, nil) + }) iter := e.newKVIter(context.Background(), opt) //nolint: errcheck diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index c585f6891a4c8..df2e20310b43b 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1147,6 +1147,12 @@ func (local *Backend) generateAndSendJob( } failpoint.Inject("beforeGenerateJob", nil) + failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { + // this is used to trigger worker failure, used together + // with WriteToTiKVNotEnoughDiskSpace + jobToWorkerCh <- ®ionJob{} + time.Sleep(5 * time.Second) + }) jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys) if err != nil { if common.IsContextCanceledError(err) { @@ -1548,29 +1554,30 @@ func (local *Backend) doImport(ctx context.Context, engine *Engine, regionRanges }) } - err := local.prepareAndSendJob( - workerCtx, - engine, - regionRanges, - regionSplitSize, - regionSplitKeys, - jobToWorkerCh, - &jobWg, - ) - if err != nil { - firstErr.Set(err) + workGroup.Go(func() error { + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + return err + } + + jobWg.Wait() workerCancel() - err2 := workGroup.Wait() - if !common.IsContextCanceledError(err2) { - log.FromContext(ctx).Error("worker meets error", zap.Error(err2)) + return nil + }) + if err := workGroup.Wait(); err != nil { + if !common.IsContextCanceledError(err) { + log.FromContext(ctx).Error("do import meets error", zap.Error(err)) } - return firstErr.Get() + firstErr.Set(err) } - - jobWg.Wait() - workerCancel() - firstErr.Set(workGroup.Wait()) - firstErr.Set(ctx.Err()) return firstErr.Get() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 902dd906fa040..c54ceac27ef99 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2085,3 +2085,40 @@ func TestCtxCancelIsIgnored(t *testing.T) { err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } + +func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []Range{ + {start: []byte{'c'}, end: []byte{'d'}}, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, + panicSplitRegionClient{}, + ), + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +} diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index d677e9c1dc7ba..cc57565ded319 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -249,6 +249,10 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd } func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) }