Skip to content

Commit

Permalink
ddl: add import failed test for adding index (#48271) (#48280)
Browse files Browse the repository at this point in the history
close #47967
  • Loading branch information
ti-chi-bot authored Nov 9, 2023
1 parent 5051428 commit 065d495
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 37 deletions.
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
return annotateErr(err, peer, "when open write stream")
}

failpoint.Inject("mockWritePeerErr", func() {
err = errors.Errorf("mock write peer error")
failpoint.Return(annotateErr(err, peer, "when open write stream"))
})

// Bind uuid for this write request
if err = wstream.Send(req); err != nil {
return annotateErr(err, peer, "when send meta")
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,13 @@ func (b *txnBackfillScheduler) close(force bool) {
if b.closed {
return
}
b.closed = true
close(b.taskCh)
if force {
closeBackfillWorkers(b.workers)
}
b.wg.Wait()
close(b.resultCh)
b.closed = true
}

type ingestBackfillScheduler struct {
Expand Down Expand Up @@ -333,6 +333,7 @@ func (b *ingestBackfillScheduler) close(force bool) {
if b.closed {
return
}
b.closed = true
close(b.taskCh)
if b.copReqSenderPool != nil {
b.copReqSenderPool.close(force)
Expand All @@ -357,7 +358,6 @@ func (b *ingestBackfillScheduler) close(force bool) {
jobID := b.reorgInfo.ID
b.backendCtx.ResetWorkers(jobID)
}
b.closed = true
}

func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) {
Expand Down
79 changes: 44 additions & 35 deletions pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type engineInfo struct {
jobID int64
indexID int64
openedEngine *backend.OpenedEngine
closedEngine *backend.ClosedEngine
uuid uuid.UUID
cfg *backend.EngineConfig
writerCount int
Expand Down Expand Up @@ -83,6 +84,11 @@ func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin

// Flush imports all the key-values in engine to the storage.
func (ei *engineInfo) Flush() error {
if ei.openedEngine == nil {
logutil.Logger(ei.ctx).Warn("engine is not open, skipping flush",
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return nil
}
err := ei.openedEngine.Flush(ei.ctx)
if err != nil {
logutil.Logger(ei.ctx).Error(LitErrFlushEngineErr, zap.Error(err),
Expand Down Expand Up @@ -120,44 +126,47 @@ func (ei *engineInfo) Clean() {

// ImportAndClean imports the engine data to TiKV and cleans up the local intermediate files.
func (ei *engineInfo) ImportAndClean() error {
// Close engine and finish local tasks of lightning.
logutil.Logger(ei.ctx).Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
indexEngine := ei.openedEngine
closeEngine, err1 := indexEngine.Close(ei.ctx)
if err1 != nil {
logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err1
}
ei.openedEngine = nil
err := ei.closeWriters()
if err != nil {
logutil.Logger(ei.ctx).Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}

// Ingest data to TiKV.
logutil.Logger(ei.ctx).Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID),
zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10)))
err = closeEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if err != nil {
logLevel := zap.ErrorLevel
if common.ErrFoundDuplicateKeys.Equal(err) {
logLevel = zap.WarnLevel
if ei.openedEngine != nil {
logutil.Logger(ei.ctx).Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
closeEngine, err1 := ei.openedEngine.Close(ei.ctx)
if err1 != nil {
logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err1
}
logutil.Logger(ei.ctx).Log(logLevel, LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
err := ei.closeWriters()
if err != nil {
logutil.Logger(ei.ctx).Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}
ei.openedEngine = nil
ei.closedEngine = closeEngine
}
if ei.closedEngine != nil {
// Ingest data to TiKV.
logutil.Logger(ei.ctx).Info(LitInfoStartImport, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID),
zap.String("split region size", strconv.FormatInt(int64(config.SplitRegionSize), 10)))
err := ei.closedEngine.Import(ei.ctx, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
if err != nil {
logLevel := zap.ErrorLevel
if common.ErrFoundDuplicateKeys.Equal(err) {
logLevel = zap.WarnLevel
}
logutil.Logger(ei.ctx).Log(logLevel, LitErrIngestDataErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}

// Clean up the engine local workspace.
err = closeEngine.Cleanup(ei.ctx)
if err != nil {
logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
// Clean up the engine local workspace.
err = ei.closedEngine.Cleanup(ei.ctx)
if err != nil {
logutil.Logger(ei.ctx).Error(LitErrCloseEngineErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}
ei.closedEngine = nil
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st
var info string
en, exist := bc.Load(indexID)
if !exist || en.openedEngine == nil {
if exist && en.closedEngine != nil {
// Import failed before, try to import again.
err := en.ImportAndClean()
if err != nil {
return nil, errors.Trace(err)
}
}
engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize)
ok := bc.MemRoot.CheckConsume(StructSizeEngineInfo + engineCacheSize)
if !ok {
Expand Down
24 changes: 24 additions & 0 deletions tests/realtikvtest/addindextest4/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,27 @@ func TestAddIndexPreCheckFailed(t *testing.T) {
tk.MustGetErrMsg("alter table t add index idx(b);", "[ddl:8256]Check ingest environment failed: mock error")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed"))
}

func TestAddIndexImportFailed(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec(`set global tidb_enable_dist_task=off;`)

tk.MustExec("create table t (a int, b int);")
for i := 0; i < 10; i++ {
insertSQL := fmt.Sprintf("insert into t values (%d, %d)", i, i)
tk.MustExec(insertSQL)
}

err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockWritePeerErr", "1*return")
require.NoError(t, err)
tk.MustExec("alter table t add index idx(a);")
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockWritePeerErr")
require.NoError(t, err)
tk.MustExec("admin check table t;")
}

0 comments on commit 065d495

Please sign in to comment.