diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index d70484c84727a..61ae643793c03 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -1005,6 +1005,9 @@ func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by LowerBound: lowerBound, UpperBound: upperBound, } + failpoint.Inject("mockGetFirstAndLastKey", func() { + failpoint.Return(lowerBound, upperBound, nil) + }) iter := e.newKVIter(context.Background(), opt) //nolint: errcheck diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 626c27a819769..8950f8a6a3a7e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1202,6 +1202,12 @@ func (local *Backend) generateAndSendJob( } failpoint.Inject("beforeGenerateJob", nil) + failpoint.Inject("sendDummyJob", func(_ failpoint.Value) { + // this is used to trigger worker failure, used together + // with WriteToTiKVNotEnoughDiskSpace + jobToWorkerCh <- ®ionJob{} + time.Sleep(5 * time.Second) + }) jobs, err := local.generateJobForRange(egCtx, p.Data, p.Range, regionSplitSize, regionSplitKeys) if err != nil { if common.IsContextCanceledError(err) { @@ -1680,29 +1686,30 @@ func (local *Backend) doImport(ctx context.Context, engine common.Engine, region failpoint.Label("afterStartWorker") - err := local.prepareAndSendJob( - workerCtx, - engine, - regionRanges, - regionSplitSize, - regionSplitKeys, - jobToWorkerCh, - &jobWg, - ) - if err != nil { - firstErr.Set(err) + workGroup.Go(func() error { + err := local.prepareAndSendJob( + workerCtx, + engine, + regionRanges, + regionSplitSize, + regionSplitKeys, + jobToWorkerCh, + &jobWg, + ) + if err != nil { + return err + } + + jobWg.Wait() workerCancel() - err2 := workGroup.Wait() - if !common.IsContextCanceledError(err2) { - log.FromContext(ctx).Error("worker meets error", zap.Error(err2)) + return nil + }) + if err := workGroup.Wait(); err != nil { + if !common.IsContextCanceledError(err) { + log.FromContext(ctx).Error("do import meets error", zap.Error(err)) } - return firstErr.Get() + firstErr.Set(err) } - - jobWg.Wait() - workerCancel() - firstErr.Set(workGroup.Wait()) - firstErr.Set(ctx.Err()) return firstErr.Get() } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 3751a95b2371a..93afa912de596 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2226,6 +2226,43 @@ func TestCtxCancelIsIgnored(t *testing.T) { require.ErrorContains(t, err, "the remaining storage capacity of TiKV") } +func TestWorkerFailedWhenGeneratingJobs(t *testing.T) { + backup := maxRetryBackoffSecond + maxRetryBackoffSecond = 1 + t.Cleanup(func() { + maxRetryBackoffSecond = backup + }) + + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey", "return()") + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace", "return()") + t.Cleanup(func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/sendDummyJob") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockGetFirstAndLastKey") + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/WriteToTiKVNotEnoughDiskSpace") + }) + + initRanges := []common.Range{ + {Start: []byte{'c'}, End: []byte{'d'}}, + } + + ctx := context.Background() + l := &Backend{ + BackendConfig: BackendConfig{ + WorkerConcurrency: 1, + }, + splitCli: initTestSplitClient( + [][]byte{{1}, {11}}, + panicSplitRegionClient{}, + ), + } + e := &Engine{} + err := l.doImport(ctx, e, initRanges, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + require.ErrorContains(t, err, "the remaining storage capacity of TiKV") +} + func TestExternalEngine(t *testing.T) { _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipSplitAndScatter", "return()") _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/skipStartWorker", "return()") diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 80d0980b05e75..0ee5f9b9a6fca 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -252,6 +252,11 @@ func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pd func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { c.mu.Lock() defer c.mu.Unlock() + + if err := ctx.Err(); err != nil { + return nil, err + } + if c.hook != nil { key, endKey, limit = c.hook.BeforeScanRegions(ctx, key, endKey, limit) }