Skip to content

Commit

Permalink
ddl: fix add index's merge with multi-schema optimization (#51747) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 25, 2024
1 parent f9fc40c commit 6feb8cf
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 54 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ go_library(
"//pkg/statistics/handle",
"//pkg/store/copr",
"//pkg/store/driver/backoff",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/table",
"//pkg/table/tables",
Expand Down
179 changes: 128 additions & 51 deletions pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
driver "github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -34,10 +35,9 @@ import (

func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
txn kv.Transaction,
idxInfo *model.IndexInfo,
idxRecords []*temporaryIndexRecord,
) error {
if !idxInfo.Unique {
if !w.currentIndex.Unique {
// non-unique key need no check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
Expand All @@ -53,6 +53,9 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
// Found a value in the original index key.
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndex.ID)
}
return errors.Trace(err)
}
} else if idxRecords[i].distinct {
Expand Down Expand Up @@ -125,6 +128,10 @@ type mergeIndexWorker struct {
tmpIdxRecords []*temporaryIndexRecord
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key

needValidateKey bool
currentTempIndexPrefix []byte
currentIndex *model.IndexInfo
}

func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker {
Expand All @@ -141,67 +148,98 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements
}
}

func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) {
tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey)
if err != nil {
return false, err
}
startIndexID := tmpID & tablecodec.IndexIDMask
tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey)
if err != nil {
return false, err
}
endIndexID := tmpID & tablecodec.IndexIDMask

w.needValidateKey = startIndexID != endIndexID
containsTargetID := false
for _, idx := range w.indexes {
idxInfo := idx.Meta()
if idxInfo.ID == startIndexID {
containsTargetID = true
w.currentIndex = idxInfo
break
}
if idxInfo.ID == endIndexID {
containsTargetID = true
}
}
return !containsTargetID, nil
}

// BackfillData merge temp index data in txn.
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
skip, err := w.validateTaskRange(&taskRange)
if skip || err != nil {
return taskCtx, err
}

oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
for _, idx := range w.indexes {
idx := idx // Make linter noloopclosure happy.
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, idx.Meta(), taskRange)
if err != nil {
return errors.Trace(err)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, idx.Meta(), tmpIdxRecords)
// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
if err != nil {
return err
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
taskCtx.addedCount++
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
return nil
})
}
if err != nil {
return err
}
taskCtx.addedCount++
}
return nil
})

failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -224,9 +262,41 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool {
return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix)
}

func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) {
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
if err != nil {
return false, err
}
idxID := tablecodec.IndexIDMask & tempIdxID
var curIdx *model.IndexInfo
for _, idx := range w.indexes {
if idx.Meta().ID == idxID {
curIdx = idx.Meta()
}
}
if curIdx == nil {
// Index IDs are always increasing, but not always continuous:
// if DDL adds another index between these indexes, it is possible that:
// multi-schema add index IDs = [1, 2, 4, 5]
// another index ID = [3]
// If the new index get rollback, temp index 0xFFxxx03 may have dirty records.
// We should skip these dirty records.
return true, nil
}
pfx := tablecodec.CutIndexPrefix(newIndexKey)

w.currentTempIndexPrefix = kv.Key(pfx).Clone()
w.currentIndex = curIdx

return false, nil
}

func (w *mergeIndexWorker) fetchTempIndexVals(
txn kv.Transaction,
indexInfo *model.IndexInfo,
taskRange reorgBackfillTask,
) ([]*temporaryIndexRecord, kv.Key, bool, error) {
startTime := time.Now()
Expand All @@ -250,11 +320,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
return false, nil
}

if w.needValidateKey && w.prefixIsChanged(indexKey) {
skip, err := w.updateCurrentIndexInfo(indexKey)
if err != nil || skip {
return skip, err
}
}

tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
if err != nil {
return false, err
}
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(indexInfo.Columns))
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.currentIndex.Columns))
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 15,
shard_count = 17,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
58 changes: 58 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,61 @@ func TestAddIndexIngestMultiSchemaChange(t *testing.T) {
tk.MustExec("alter table t add index idx_a(a), add index idx_ab(a, b), add index idx_d(d);")
tk.MustExec("admin check table t;")
}

func TestAddIndexDuplicateMessage(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
defer ingesttestutil.InjectMockBackendMgr(t, store)()

tk.MustExec("create table t(id int primary key, b int, k int);")
tk.MustExec("insert into t values (1, 1, 1);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

var runDML bool
var errDML error

ingest.MockExecAfterWriteRow = func() {
if runDML {
return
}
_, errDML = tk1.Exec("insert into t values (2, 1, 2);")
runDML = true
}

tk.MustGetErrMsg("alter table t add unique index idx(b);", "[kv:1062]Duplicate entry '1' for key 't.idx'")

require.NoError(t, errDML)
require.True(t, runDML)
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2"))
}

func TestMultiSchemaAddIndexMerge(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

first := true
var tk2Err error
ingest.MockExecAfterWriteRow = func() {
if !first {
return
}
_, tk2Err = tk2.Exec("insert into t values (4, 4);")
first = false
}

tk.MustExec("alter table t add index idx1(a), add index idx2(b);")
require.False(t, first)
require.NoError(t, tk2Err)
tk.MustExec("admin check table t;")
}
12 changes: 11 additions & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,14 @@ func (m *MockWriter) WriteRow(_ context.Context, key, idxVal []byte, _ kv.Handle
if err != nil {
return err
}
return txn.Set(key, idxVal)
err = txn.Set(key, idxVal)
if err != nil {
return err
}
if MockExecAfterWriteRow != nil {
MockExecAfterWriteRow()
}
return nil
}

// LockForWrite implements Writer.LockForWrite interface.
Expand All @@ -241,3 +248,6 @@ func (*MockWriter) LockForWrite() func() {
func (*MockWriter) Close(_ context.Context) error {
return nil
}

// MockExecAfterWriteRow is only used for test.
var MockExecAfterWriteRow func()
5 changes: 4 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,10 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
tb = tbl.(table.PhysicalTable)
}
if mergingTmpIdx {
start, end = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|elements[0].ID)
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
end = tablecodec.EncodeIndexSeekKey(pid, lastElemTempID, []byte{255})
} else {
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
Expand Down

0 comments on commit 6feb8cf

Please sign in to comment.