diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index adbb2f3cbd6aa..58e2a9e578855 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -117,6 +117,7 @@ go_library( "//pkg/statistics/handle", "//pkg/store/copr", "//pkg/store/driver/backoff", + "//pkg/store/driver/txn", "//pkg/store/helper", "//pkg/table", "//pkg/table/tables", diff --git a/pkg/ddl/index_merge_tmp.go b/pkg/ddl/index_merge_tmp.go index 7c0504e1ebf1e..4280c6506ce74 100644 --- a/pkg/ddl/index_merge_tmp.go +++ b/pkg/ddl/index_merge_tmp.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" + driver "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -34,10 +35,9 @@ import ( func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey( txn kv.Transaction, - idxInfo *model.IndexInfo, idxRecords []*temporaryIndexRecord, ) error { - if !idxInfo.Unique { + if !w.currentIndex.Unique { // non-unique key need no check, just overwrite it, // because in most case, backfilling indices is not exists. return nil @@ -53,6 +53,9 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey( // Found a value in the original index key. err := checkTempIndexKey(txn, idxRecords[i], val, w.table) if err != nil { + if kv.ErrKeyExists.Equal(err) { + return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndex.ID) + } return errors.Trace(err) } } else if idxRecords[i].distinct { @@ -125,6 +128,10 @@ type mergeIndexWorker struct { tmpIdxRecords []*temporaryIndexRecord originIdxKeys []kv.Key tmpIdxKeys []kv.Key + + needValidateKey bool + currentTempIndexPrefix []byte + currentIndex *model.IndexInfo } func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker { @@ -141,67 +148,98 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements } } +func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) { + tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey) + if err != nil { + return false, err + } + startIndexID := tmpID & tablecodec.IndexIDMask + tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey) + if err != nil { + return false, err + } + endIndexID := tmpID & tablecodec.IndexIDMask + + w.needValidateKey = startIndexID != endIndexID + containsTargetID := false + for _, idx := range w.indexes { + idxInfo := idx.Meta() + if idxInfo.ID == startIndexID { + containsTargetID = true + w.currentIndex = idxInfo + break + } + if idxInfo.ID == endIndexID { + containsTargetID = true + } + } + return !containsTargetID, nil +} + // BackfillData merge temp index data in txn. func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { + skip, err := w.validateTaskRange(&taskRange) + if skip || err != nil { + return taskCtx, err + } + oprStartTime := time.Now() ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL) - for _, idx := range w.indexes { - idx := idx // Make linter noloopclosure happy. - errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { - taskCtx.addedCount = 0 - taskCtx.scanCount = 0 - txn.SetOption(kv.Priority, taskRange.priority) - if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil { - txn.SetOption(kv.ResourceGroupTagger, tagger) - } - txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) - tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, idx.Meta(), taskRange) - if err != nil { - return errors.Trace(err) + errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error { + taskCtx.addedCount = 0 + taskCtx.scanCount = 0 + txn.SetOption(kv.Priority, taskRange.priority) + if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil { + txn.SetOption(kv.ResourceGroupTagger, tagger) + } + txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) + + tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange) + if err != nil { + return errors.Trace(err) + } + taskCtx.nextKey = nextKey + taskCtx.done = taskDone + + err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords) + if err != nil { + return errors.Trace(err) + } + + for i, idxRecord := range tmpIdxRecords { + taskCtx.scanCount++ + // The index is already exists, we skip it, no needs to backfill it. + // The following update, delete, insert on these rows, TiDB can handle it correctly. + // If all batch are skipped, update first index key to make txn commit to release lock. + if idxRecord.skip { + continue } - taskCtx.nextKey = nextKey - taskCtx.done = taskDone - err = w.batchCheckTemporaryUniqueKey(txn, idx.Meta(), tmpIdxRecords) + // Lock the corresponding row keys so that it doesn't modify the index KVs + // that are changing by a pessimistic transaction. + rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle) + err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey) if err != nil { return errors.Trace(err) } - for i, idxRecord := range tmpIdxRecords { - taskCtx.scanCount++ - // The index is already exists, we skip it, no needs to backfill it. - // The following update, delete, insert on these rows, TiDB can handle it correctly. - // If all batch are skipped, update first index key to make txn commit to release lock. - if idxRecord.skip { - continue - } - - // Lock the corresponding row keys so that it doesn't modify the index KVs - // that are changing by a pessimistic transaction. - rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle) - err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey) - if err != nil { - return errors.Trace(err) - } - - if idxRecord.delete { - if idxRecord.unique { - err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked) - } else { - err = txn.GetMemBuffer().Delete(w.originIdxKeys[i]) - } + if idxRecord.delete { + if idxRecord.unique { + err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked) } else { - err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals) - } - if err != nil { - return err + err = txn.GetMemBuffer().Delete(w.originIdxKeys[i]) } - taskCtx.addedCount++ + } else { + err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals) } - return nil - }) - } + if err != nil { + return err + } + taskCtx.addedCount++ + } + return nil + }) failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { //nolint:forcetypeassert @@ -224,9 +262,41 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx { return w.backfillCtx } +func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool { + return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix) +} + +func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) { + tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey) + if err != nil { + return false, err + } + idxID := tablecodec.IndexIDMask & tempIdxID + var curIdx *model.IndexInfo + for _, idx := range w.indexes { + if idx.Meta().ID == idxID { + curIdx = idx.Meta() + } + } + if curIdx == nil { + // Index IDs are always increasing, but not always continuous: + // if DDL adds another index between these indexes, it is possible that: + // multi-schema add index IDs = [1, 2, 4, 5] + // another index ID = [3] + // If the new index get rollback, temp index 0xFFxxx03 may have dirty records. + // We should skip these dirty records. + return true, nil + } + pfx := tablecodec.CutIndexPrefix(newIndexKey) + + w.currentTempIndexPrefix = kv.Key(pfx).Clone() + w.currentIndex = curIdx + + return false, nil +} + func (w *mergeIndexWorker) fetchTempIndexVals( txn kv.Transaction, - indexInfo *model.IndexInfo, taskRange reorgBackfillTask, ) ([]*temporaryIndexRecord, kv.Key, bool, error) { startTime := time.Now() @@ -250,11 +320,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals( return false, nil } + if w.needValidateKey && w.prefixIsChanged(indexKey) { + skip, err := w.updateCurrentIndexInfo(indexKey) + if err != nil || skip { + return skip, err + } + } + tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue) if err != nil { return false, err } - tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(indexInfo.Columns)) + tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.currentIndex.Columns)) if err != nil { return false, err } diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index b1423b74116ee..979b3050bcd93 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -66,7 +66,7 @@ go_test( embed = [":ingest"], flaky = True, race = "on", - shard_count = 15, + shard_count = 17, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index d7b91579befa4..5a7ba138c525d 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -309,3 +309,61 @@ func TestAddIndexIngestMultiSchemaChange(t *testing.T) { tk.MustExec("alter table t add index idx_a(a), add index idx_ab(a, b), add index idx_d(d);") tk.MustExec("admin check table t;") } + +func TestAddIndexDuplicateMessage(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + defer ingesttestutil.InjectMockBackendMgr(t, store)() + + tk.MustExec("create table t(id int primary key, b int, k int);") + tk.MustExec("insert into t values (1, 1, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + var runDML bool + var errDML error + + ingest.MockExecAfterWriteRow = func() { + if runDML { + return + } + _, errDML = tk1.Exec("insert into t values (2, 1, 2);") + runDML = true + } + + tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'") + + require.NoError(t, errDML) + require.True(t, runDML) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2")) +} + +func TestMultiSchemaAddIndexMerge(t *testing.T) { + store := testkit.CreateMockStore(t) + defer ingesttestutil.InjectMockBackendMgr(t, store)() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);") + + first := true + var tk2Err error + ingest.MockExecAfterWriteRow = func() { + if !first { + return + } + _, tk2Err = tk2.Exec("insert into t values (4, 4);") + first = false + } + + tk.MustExec("alter table t add index idx1(a), add index idx2(b);") + require.False(t, first) + require.NoError(t, tk2Err) + tk.MustExec("admin check table t;") +} diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 257175b6bbcb6..c6fb06e006ee5 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -229,7 +229,14 @@ func (m *MockWriter) WriteRow(_ context.Context, key, idxVal []byte, _ kv.Handle if err != nil { return err } - return txn.Set(key, idxVal) + err = txn.Set(key, idxVal) + if err != nil { + return err + } + if MockExecAfterWriteRow != nil { + MockExecAfterWriteRow() + } + return nil } // LockForWrite implements Writer.LockForWrite interface. @@ -241,3 +248,6 @@ func (*MockWriter) LockForWrite() func() { func (*MockWriter) Close(_ context.Context) error { return nil } + +// MockExecAfterWriteRow is only used for test. +var MockExecAfterWriteRow func() diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index 65089a81ddc1d..e6e100f8383ac 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -697,7 +697,10 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, tb = tbl.(table.PhysicalTable) } if mergingTmpIdx { - start, end = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|elements[0].ID) + firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID + lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID + start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil) + end = tablecodec.EncodeIndexSeekKey(pid, lastElemTempID, []byte{255}) } else { start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority) if err != nil {