From 6d95459631815a9724762fcf5c05460eb8266a36 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 20 Aug 2024 17:13:42 +0800 Subject: [PATCH] ddl: fix resuming to wrong checkpoint when failed during adding index (#55506) close pingcap/tidb#55488 --- pkg/ddl/backfilling.go | 18 ++++++---- pkg/ddl/backfilling_operators.go | 4 +-- pkg/ddl/ingest/checkpoint.go | 6 ++-- .../realtikvtest/addindextest3/ingest_test.go | 35 ++++++++++++++++--- 4 files changed, 48 insertions(+), 15 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 5c4c4ae06539f..49fa029d7e59a 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -452,6 +452,11 @@ func loadTableRanges( zap.Int64("physicalTableID", t.GetPhysicalID())) return []kv.KeyRange{{StartKey: startKey, EndKey: endKey}}, nil } + failpoint.Inject("setLimitForLoadTableRanges", func(val failpoint.Value) { + if v, ok := val.(int); ok { + limit = v + } + }) rc := s.GetRegionCache() maxSleep := 10000 // ms @@ -466,6 +471,12 @@ func loadTableRanges( if err != nil { return false, errors.Trace(err) } + var mockErr bool + failpoint.InjectCall("beforeLoadRangeFromPD", &mockErr) + if mockErr { + return false, kv.ErrTxnRetryable + } + ranges = make([]kv.KeyRange, 0, len(rs)) for _, r := range rs { ranges = append(ranges, kv.KeyRange{StartKey: r.StartKey(), EndKey: r.EndKey()}) @@ -636,12 +647,7 @@ func makeupDecodeColMap(dbName model.CIStr, t table.Table) (map[int64]decoder.Co return decodeColMap, nil } -var backfillTaskChanSize = 128 - -// SetBackfillTaskChanSizeForTest is only used for test. -func SetBackfillTaskChanSizeForTest(n int) { - backfillTaskChanSize = n -} +const backfillTaskChanSize = 128 func (dc *ddlCtx) runAddIndexInLocalIngestMode( ctx context.Context, diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index ca14b24807ed5..201c09702af80 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -346,7 +346,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K if src.cpMgr == nil { return start, false } - cpKey := src.cpMgr.LastProcessedKey() + cpKey := src.cpMgr.NextKeyToProcess() if len(cpKey) == 0 { return start, false } @@ -364,7 +364,7 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.K if cpKey.Cmp(end) == 0 { return cpKey, true } - return cpKey.Next(), false + return cpKey, false } func (src *TableScanTaskSource) generateTasks() error { diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 2d1536905736c..0575dd27af966 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -156,9 +156,9 @@ func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool { return s.localDataIsValid && len(s.flushedKeyLowWatermark) > 0 && end.Cmp(s.flushedKeyLowWatermark) <= 0 } -// LastProcessedKey finds the last processed key in checkpoint. -// If there is no processed key, it returns nil. -func (s *CheckpointManager) LastProcessedKey() kv.Key { +// NextKeyToProcess finds the next unprocessed key in checkpoint. +// If there is no such key, it returns nil. +func (s *CheckpointManager) NextKeyToProcess() kv.Key { s.mu.Lock() defer s.mu.Unlock() diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 0b7b6c805df5a..808b861cd8fb3 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -348,10 +348,10 @@ func TestAddIndexSplitTableRanges(t *testing.T) { } tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1")) - ddl.SetBackfillTaskChanSizeForTest(4) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(4)") tk.MustExec("alter table t add index idx(b);") tk.MustExec("admin check table t;") - ddl.SetBackfillTaskChanSizeForTest(7) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(7)") tk.MustExec("alter table t add index idx_2(b);") tk.MustExec("admin check table t;") @@ -361,10 +361,37 @@ func TestAddIndexSplitTableRanges(t *testing.T) { tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) } tk.MustQuery("split table t by (10000),(20000),(30000),(40000),(50000),(60000);").Check(testkit.Rows("6 1")) - ddl.SetBackfillTaskChanSizeForTest(4) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(4)") + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") +} + +func TestAddIndexLoadTableRangeError(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec(`set global tidb_enable_dist_task=off;`) // Use checkpoint manager. + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 8; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + tk.MustQuery("split table t by (10000),(20000),(30000),(40000),(50000),(60000);").Check(testkit.Rows("6 1")) + + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/setLimitForLoadTableRanges", "return(3)") + var batchCnt int + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadRangeFromPD", func(mockErr *bool) { + batchCnt++ + if batchCnt == 2 { + *mockErr = true + } + }) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/ingest/forceSyncFlagForTest", "return") tk.MustExec("alter table t add unique index idx(b);") tk.MustExec("admin check table t;") - ddl.SetBackfillTaskChanSizeForTest(1024) } func TestAddIndexMockFlushError(t *testing.T) {