Skip to content

Commit

Permalink
ddl: fix add index's merge with multi-schema optimization (#51747)
Browse files Browse the repository at this point in the history
close #51746
  • Loading branch information
tangenta authored Mar 14, 2024
1 parent 6a76187 commit c1e3dae
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 55 deletions.
179 changes: 126 additions & 53 deletions pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import (

func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
txn kv.Transaction,
idxInfo *model.IndexInfo,
idxRecords []*temporaryIndexRecord,
) error {
if !idxInfo.Unique {
if !w.currentIndex.Unique {
// non-unique key need no check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
Expand All @@ -55,7 +54,7 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), idxInfo.ID)
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndex.ID)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -128,6 +127,10 @@ type mergeIndexWorker struct {
tmpIdxRecords []*temporaryIndexRecord
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key

needValidateKey bool
currentTempIndexPrefix []byte
currentIndex *model.IndexInfo
}

func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker {
Expand All @@ -144,68 +147,99 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements
}
}

func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) {
tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey)
if err != nil {
return false, err
}
startIndexID := tmpID & tablecodec.IndexIDMask
tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey)
if err != nil {
return false, err
}
endIndexID := tmpID & tablecodec.IndexIDMask

w.needValidateKey = startIndexID != endIndexID
containsTargetID := false
for _, idx := range w.indexes {
idxInfo := idx.Meta()
if idxInfo.ID == startIndexID {
containsTargetID = true
w.currentIndex = idxInfo
break
}
if idxInfo.ID == endIndexID {
containsTargetID = true
}
}
return !containsTargetID, nil
}

// BackfillData merge temp index data in txn.
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
skip, err := w.validateTaskRange(&taskRange)
if skip || err != nil {
return taskCtx, err
}

oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
for _, idx := range w.indexes {
idx := idx // Make linter noloopclosure happy.
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, idx.Meta(), taskRange)
if err != nil {
return errors.Trace(err)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, idx.Meta(), tmpIdxRecords)
// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
if err != nil {
return err
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
taskCtx.addedCount++
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
return nil
})
}
if err != nil {
return err
}
taskCtx.addedCount++
}
return nil
})

failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -228,9 +262,41 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool {
return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix)
}

func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) {
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
if err != nil {
return false, err
}
idxID := tablecodec.IndexIDMask & tempIdxID
var curIdx *model.IndexInfo
for _, idx := range w.indexes {
if idx.Meta().ID == idxID {
curIdx = idx.Meta()
}
}
if curIdx == nil {
// Index IDs are always increasing, but not always continuous:
// if DDL adds another index between these indexes, it is possible that:
// multi-schema add index IDs = [1, 2, 4, 5]
// another index ID = [3]
// If the new index get rollback, temp index 0xFFxxx03 may have dirty records.
// We should skip these dirty records.
return true, nil
}
pfx := tablecodec.CutIndexPrefix(newIndexKey)

w.currentTempIndexPrefix = kv.Key(pfx).Clone()
w.currentIndex = curIdx

return false, nil
}

func (w *mergeIndexWorker) fetchTempIndexVals(
txn kv.Transaction,
indexInfo *model.IndexInfo,
taskRange reorgBackfillTask,
) ([]*temporaryIndexRecord, kv.Key, bool, error) {
startTime := time.Now()
Expand All @@ -254,11 +320,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
return false, nil
}

if w.needValidateKey && w.prefixIsChanged(indexKey) {
skip, err := w.updateCurrentIndexInfo(indexKey)
if err != nil || skip {
return skip, err
}
}

tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
if err != nil {
return false, err
}
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(indexInfo.Columns))
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.currentIndex.Columns))
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 16,
shard_count = 17,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,30 @@ func TestAddIndexDuplicateMessage(t *testing.T) {
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2"))
}

func TestMultiSchemaAddIndexMerge(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

first := true
var tk2Err error
ingest.MockExecAfterWriteRow = func() {
if !first {
return
}
_, tk2Err = tk2.Exec("insert into t values (4, 4);")
first = false
}

tk.MustExec("alter table t add index idx1(a), add index idx2(b);")
require.False(t, first)
require.NoError(t, tk2Err)
tk.MustExec("admin check table t;")
}
5 changes: 4 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,10 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
tb = tbl.(table.PhysicalTable)
}
if mergingTmpIdx {
start, end = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|elements[0].ID)
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
end = tablecodec.EncodeIndexSeekKey(pid, lastElemTempID, []byte{255})
} else {
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
Expand Down

0 comments on commit c1e3dae

Please sign in to comment.