From 065d495c4176a9f30cb1e512dea081f5d9731e71 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 9 Nov 2023 10:20:12 +0800 Subject: [PATCH] ddl: add import failed test for adding index (#48271) (#48280) close pingcap/tidb#47967 --- br/pkg/lightning/backend/local/region_job.go | 5 ++ pkg/ddl/backfilling_scheduler.go | 4 +- pkg/ddl/ingest/engine.go | 79 +++++++++++-------- pkg/ddl/ingest/engine_mgr.go | 7 ++ .../realtikvtest/addindextest4/ingest_test.go | 24 ++++++ 5 files changed, 82 insertions(+), 37 deletions(-) diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index b328fc351d25c..26a25f7895a2a 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -307,6 +307,11 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { return annotateErr(err, peer, "when open write stream") } + failpoint.Inject("mockWritePeerErr", func() { + err = errors.Errorf("mock write peer error") + failpoint.Return(annotateErr(err, peer, "when open write stream")) + }) + // Bind uuid for this write request if err = wstream.Send(req); err != nil { return annotateErr(err, peer, "when send meta") diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 02c48a532ee12..d2a981b3a4243 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -257,13 +257,13 @@ func (b *txnBackfillScheduler) close(force bool) { if b.closed { return } + b.closed = true close(b.taskCh) if force { closeBackfillWorkers(b.workers) } b.wg.Wait() close(b.resultCh) - b.closed = true } type ingestBackfillScheduler struct { @@ -333,6 +333,7 @@ func (b *ingestBackfillScheduler) close(force bool) { if b.closed { return } + b.closed = true close(b.taskCh) if b.copReqSenderPool != nil { b.copReqSenderPool.close(force) @@ -357,7 +358,6 @@ func (b *ingestBackfillScheduler) close(force bool) { jobID := b.reorgInfo.ID b.backendCtx.ResetWorkers(jobID) } - b.closed = true } func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) { diff --git a/pkg/ddl/ingest/engine.go b/pkg/ddl/ingest/engine.go index c1b0ec91fb9b5..e8bb564d483ee 100644 --- a/pkg/ddl/ingest/engine.go +++ b/pkg/ddl/ingest/engine.go @@ -55,6 +55,7 @@ type engineInfo struct { jobID int64 indexID int64 openedEngine *backend.OpenedEngine + closedEngine *backend.ClosedEngine uuid uuid.UUID cfg *backend.EngineConfig writerCount int @@ -83,6 +84,11 @@ func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin // Flush imports all the key-values in engine to the storage. func (ei *engineInfo) Flush() error { + if ei.openedEngine == nil { + logutil.Logger(ei.ctx).Warn("engine is not open, skipping flush", + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return nil + } err := ei.openedEngine.Flush(ei.ctx) if err != nil { logutil.Logger(ei.ctx).Error(LitErrFlushEngineErr, zap.Error(err), @@ -120,44 +126,47 @@ func (ei *engineInfo) Clean() { // ImportAndClean imports the engine data to TiKV and cleans up the local intermediate files. func (ei *engineInfo) ImportAndClean() error { - // Close engine and finish local tasks of lightning. - logutil.Logger(ei.ctx).Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - indexEngine := ei.openedEngine - closeEngine, err1 := indexEngine.Close(ei.ctx) - if err1 != nil { - logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err1), - zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return err1 - } - ei.openedEngine = nil - err := ei.closeWriters() - if err != nil { - logutil.Logger(ei.ctx).Error(LitErrCloseWriterErr, zap.Error(err), - zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return err - } - - // Ingest data to TiKV. - logutil.Logger(ei.ctx).Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID), - zap.Int64("index ID", ei.indexID), - zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10))) - err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) - if err != nil { - logLevel := zap.ErrorLevel - if common.ErrFoundDuplicateKeys.Equal(err) { - logLevel = zap.WarnLevel + if ei.openedEngine != nil { + logutil.Logger(ei.ctx).Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + closeEngine, err1 := ei.openedEngine.Close(ei.ctx) + if err1 != nil { + logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err1), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err1 } - logutil.Logger(ei.ctx).Log(logLevel, LitErrIngestDataErr, zap.Error(err), - zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return err + err := ei.closeWriters() + if err != nil { + logutil.Logger(ei.ctx).Error(LitErrCloseWriterErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } + ei.openedEngine = nil + ei.closedEngine = closeEngine } + if ei.closedEngine != nil { + // Ingest data to TiKV. + logutil.Logger(ei.ctx).Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID), + zap.Int64("index ID", ei.indexID), + zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10))) + err := ei.closedEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + if err != nil { + logLevel := zap.ErrorLevel + if common.ErrFoundDuplicateKeys.Equal(err) { + logLevel = zap.WarnLevel + } + logutil.Logger(ei.ctx).Log(logLevel, LitErrIngestDataErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } - // Clean up the engine local workspace. - err = closeEngine.Cleanup(ei.ctx) - if err != nil { - logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err), - zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) - return err + // Clean up the engine local workspace. + err = ei.closedEngine.Cleanup(ei.ctx) + if err != nil { + logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err), + zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) + return err + } + ei.closedEngine = nil } return nil } diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index aefabbcea7278..7932b05da365b 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -37,6 +37,13 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st var info string en, exist := bc.Load(indexID) if !exist || en.openedEngine == nil { + if exist && en.closedEngine != nil { + // Import failed before, try to import again. + err := en.ImportAndClean() + if err != nil { + return nil, errors.Trace(err) + } + } engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) ok := bc.MemRoot.CheckConsume(StructSizeEngineInfo + engineCacheSize) if !ok { diff --git a/tests/realtikvtest/addindextest4/ingest_test.go b/tests/realtikvtest/addindextest4/ingest_test.go index cd01025a33f58..8352ba38c2f84 100644 --- a/tests/realtikvtest/addindextest4/ingest_test.go +++ b/tests/realtikvtest/addindextest4/ingest_test.go @@ -499,3 +499,27 @@ func TestAddIndexPreCheckFailed(t *testing.T) { tk.MustGetErrMsg("alter table t add index idx(b);", "[ddl:8256]Check ingest environment failed: mock error") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed")) } + +func TestAddIndexImportFailed(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec(`set global tidb_enable_dist_task=off;`) + + tk.MustExec("create table t (a int, b int);") + for i := 0; i < 10; i++ { + insertSQL := fmt.Sprintf("insert into t values (%d, %d)", i, i) + tk.MustExec(insertSQL) + } + + err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockWritePeerErr", "1*return") + require.NoError(t, err) + tk.MustExec("alter table t add index idx(a);") + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockWritePeerErr") + require.NoError(t, err) + tk.MustExec("admin check table t;") +}