Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix add index's merge with multi-schema optimization #51747

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 90 additions & 53 deletions pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import (

func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
txn kv.Transaction,
idxInfo *model.IndexInfo,
idxRecords []*temporaryIndexRecord,
) error {
if !idxInfo.Unique {
if !w.currentIndexIsUnique {
// non-unique key need no check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
Expand All @@ -55,7 +54,7 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), idxInfo.ID)
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndexID)
}
return errors.Trace(err)
}
Expand Down Expand Up @@ -128,6 +127,11 @@ type mergeIndexWorker struct {
tmpIdxRecords []*temporaryIndexRecord
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key

currentTempIndexPrefix []byte
currentIndexID int64
currentIndexColLen int
currentIndexIsUnique bool
}

func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker {
Expand All @@ -148,64 +152,62 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
for _, idx := range w.indexes {
idx := idx // Make linter noloopclosure happy.
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, idx.Meta(), taskRange)
if err != nil {
return errors.Trace(err)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the skip tempIndex or switch this logic
if len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(indexKey, w.currentTempIndexPrefix) { skip, err := w.updateCurrentIndexInfo(indexKey) if err != nil || skip { return skip, err } }
out of fetchTempIndexVals func and do it in taskRange level ?
This could reduce compare and switch prefix logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

err = w.batchCheckTemporaryUniqueKey(txn, idx.Meta(), tmpIdxRecords)
// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
if err != nil {
return err
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
taskCtx.addedCount++
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
return nil
})
}
if err != nil {
return err
}
taskCtx.addedCount++
}
return nil
})

failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -228,9 +230,37 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) error {
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
if err != nil {
return err
}
idxID := tablecodec.IndexIDMask & tempIdxID
pfx := tablecodec.CutIndexKeyPrefix(newIndexKey)
var curIdx *model.IndexInfo
for _, idx := range w.indexes {
if idx.Meta().ID == idxID {
curIdx = idx.Meta()
}
}
if curIdx == nil {
allIdxIDs := make([]int64, 0, len(w.indexes))
for _, idx := range w.indexes {
allIdxIDs = append(allIdxIDs, idx.Meta().ID)
}
return errors.Errorf("index %d not found in %v", idxID, allIdxIDs)
}

w.currentTempIndexPrefix = kv.Key(pfx).Clone()
w.currentIndexID = idxID
w.currentIndexColLen = len(curIdx.Columns)
w.currentIndexIsUnique = curIdx.Unique

return nil
}

func (w *mergeIndexWorker) fetchTempIndexVals(
txn kv.Transaction,
indexInfo *model.IndexInfo,
taskRange reorgBackfillTask,
) ([]*temporaryIndexRecord, kv.Key, bool, error) {
startTime := time.Now()
Expand All @@ -254,11 +284,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
return false, nil
}

if !bytes.HasPrefix(indexKey, w.currentTempIndexPrefix) {
err := w.updateCurrentIndexInfo(indexKey)
if err != nil {
return false, err
}
}

tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
if err != nil {
return false, err
}
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(indexInfo.Columns))
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, w.currentIndexColLen)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 16,
shard_count = 17,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,30 @@ func TestAddIndexDuplicateMessage(t *testing.T) {
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2"))
}

func TestMultiSchemaAddIndexMerge(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

first := true
var tk2Err error
ingest.MockExecAfterWriteRow = func() {
if !first {
return
}
_, tk2Err = tk2.Exec("insert into t values (4, 4);")
first = false
}

tk.MustExec("alter table t add index idx1(a), add index idx2(b);")
require.False(t, first)
require.NoError(t, tk2Err)
tk.MustExec("admin check table t;")
}
5 changes: 4 additions & 1 deletion pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,10 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
tb = tbl.(table.PhysicalTable)
}
if mergingTmpIdx {
start, end = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|elements[0].ID)
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
tangenta marked this conversation as resolved.
Show resolved Hide resolved
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
end = tablecodec.EncodeIndexSeekKey(pid, lastElemTempID, []byte{255})
} else {
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func CutRowKeyPrefix(key kv.Key) []byte {
return key[prefixLen:]
}

// CutIndexKeyPrefix cuts the index key prefix.
func CutIndexKeyPrefix(key kv.Key) []byte {
return key[len(tablePrefix)+8+len(indexPrefixSep):]
}

// EncodeRecordKey encodes the recordPrefix, row handle into a kv.Key.
func EncodeRecordKey(recordPrefix kv.Key, h kv.Handle) kv.Key {
buf := make([]byte, 0, len(recordPrefix)+h.Len())
Expand Down