Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix add index's merge with multi-schema optimization #51747

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 126 additions & 53 deletions pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import (

func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
txn kv.Transaction,
idxInfo *model.IndexInfo,
idxRecords []*temporaryIndexRecord,
) error {
if !idxInfo.Unique {
if !w.currentIndex.Unique {
// non-unique key need no check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
Expand All @@ -55,7 +54,7 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), idxInfo.ID)
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndex.ID)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -128,6 +127,10 @@ type mergeIndexWorker struct {
tmpIdxRecords []*temporaryIndexRecord
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key

needValidateKey bool
currentTempIndexPrefix []byte
currentIndex *model.IndexInfo
}

func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker {
Expand All @@ -144,68 +147,99 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements
}
}

func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) {
tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey)
if err != nil {
return false, err
}
startIndexID := tmpID & tablecodec.IndexIDMask
tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey)
if err != nil {
return false, err
}
endIndexID := tmpID & tablecodec.IndexIDMask

w.needValidateKey = startIndexID != endIndexID
containsTargetID := false
for _, idx := range w.indexes {
idxInfo := idx.Meta()
if idxInfo.ID == startIndexID {
containsTargetID = true
w.currentIndex = idxInfo
break
}
if idxInfo.ID == endIndexID {
containsTargetID = true
}
}
return !containsTargetID, nil
}

// BackfillData merge temp index data in txn.
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
skip, err := w.validateTaskRange(&taskRange)
if skip || err != nil {
return taskCtx, err
}

oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
for _, idx := range w.indexes {
idx := idx // Make linter noloopclosure happy.
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, idx.Meta(), taskRange)
if err != nil {
return errors.Trace(err)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the skip tempIndex or switch this logic
if len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(indexKey, w.currentTempIndexPrefix) { skip, err := w.updateCurrentIndexInfo(indexKey) if err != nil || skip { return skip, err } }
out of fetchTempIndexVals func and do it in taskRange level ?
This could reduce compare and switch prefix logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, idx.Meta(), tmpIdxRecords)
// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
if err != nil {
return err
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
taskCtx.addedCount++
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
return nil
})
}
if err != nil {
return err
}
taskCtx.addedCount++
}
return nil
})

failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -228,9 +262,41 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool {
return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix)
}

func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) {
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
if err != nil {
return false, err
}
idxID := tablecodec.IndexIDMask & tempIdxID
var curIdx *model.IndexInfo
for _, idx := range w.indexes {
if idx.Meta().ID == idxID {
curIdx = idx.Meta()
}
}
if curIdx == nil {
// Index IDs are always increasing, but not always continuous:
// if DDL adds another index between these indexes, it is possible that:
// multi-schema add index IDs = [1, 2, 4, 5]
// another index ID = [3]
// If the new index get rollback, temp index 0xFFxxx03 may have dirty records.
// We should skip these dirty records.
return true, nil
}
pfx := tablecodec.CutIndexPrefix(newIndexKey)

w.currentTempIndexPrefix = kv.Key(pfx).Clone()
w.currentIndex = curIdx

return false, nil
}

func (w *mergeIndexWorker) fetchTempIndexVals(
txn kv.Transaction,
indexInfo *model.IndexInfo,
taskRange reorgBackfillTask,
) ([]*temporaryIndexRecord, kv.Key, bool, error) {
startTime := time.Now()
Expand All @@ -254,11 +320,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
return false, nil
}

if w.needValidateKey && w.prefixIsChanged(indexKey) {
skip, err := w.updateCurrentIndexInfo(indexKey)
if err != nil || skip {
return skip, err
}
}

tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
if err != nil {
return false, err
}
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(indexInfo.Columns))
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.currentIndex.Columns))
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 16,
shard_count = 17,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,30 @@ func TestAddIndexDuplicateMessage(t *testing.T) {
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2"))
}

func TestMultiSchemaAddIndexMerge(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

first := true
var tk2Err error
ingest.MockExecAfterWriteRow = func() {
if !first {
return
}
_, tk2Err = tk2.Exec("insert into t values (4, 4);")
first = false
}

tk.MustExec("alter table t add index idx1(a), add index idx2(b);")
require.False(t, first)
require.NoError(t, tk2Err)
tk.MustExec("admin check table t;")
}
5 changes: 4 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,10 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
tb = tbl.(table.PhysicalTable)
}
if mergingTmpIdx {
start, end = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|elements[0].ID)
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
tangenta marked this conversation as resolved.
Show resolved Hide resolved
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
end = tablecodec.EncodeIndexSeekKey(pid, lastElemTempID, []byte{255})
} else {
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
Expand Down