Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support recover multi-valued index #41181

Merged
merged 15 commits into from
Feb 9, 2023
42 changes: 29 additions & 13 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
}
e.idxValsBufs[result.scanRowCount] = idxVals
rsData := tables.TryGetHandleRestoredDataWrapper(e.table.Meta(), plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta())
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: false})
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: true})
result.scanRowCount++
result.currentHandle = handle
}
Expand Down Expand Up @@ -438,16 +438,31 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
}
e.batchKeys = e.batchKeys[:0]
sc := e.ctx.GetSessionVars().StmtCtx
distinctFlags := make([]bool, len(rows))
distinctFlags := make([]bool, 0, len(rows))
rowIdx := make([]int, 0, len(rows))
cnt := 0
for i, row := range rows {
idxKey, distinct, err := e.index.GenIndexKey(sc, row.idxVals, row.handle, e.idxKeyBufs[i])
if err != nil {
return err
}
e.idxKeyBufs[i] = idxKey
iter := e.index.GenIndexKVIter(sc, row.idxVals, row.handle, nil)
for iter.Valid() {
var buf []byte
if cnt < len(e.idxKeyBufs) {
buf = e.idxKeyBufs[cnt]
}
key, _, distinct, err := iter.Next(buf)
if err != nil {
return err
}
if cnt < len(e.idxKeyBufs) {
e.idxKeyBufs[cnt] = key
} else {
e.idxKeyBufs = append(e.idxKeyBufs, key)
}

e.batchKeys = append(e.batchKeys, idxKey)
distinctFlags[i] = distinct
cnt++
e.batchKeys = append(e.batchKeys, key)
distinctFlags = append(distinctFlags, distinct)
rowIdx = append(rowIdx, i)
}
}

values, err := txn.BatchGet(context.Background(), e.batchKeys)
Expand All @@ -460,21 +475,22 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
// 3. non-unique-key is duplicate, skip it.
isCommonHandle := e.table.Meta().IsCommonHandle
for i, key := range e.batchKeys {
if val, found := values[string(key)]; found {
val, found := values[string(key)]
if found {
if distinctFlags[i] {
handle, err1 := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommonHandle)
if err1 != nil {
return err1
}

if handle.Compare(rows[i].handle) != 0 {
if handle.Compare(rows[rowIdx[i]].handle) != 0 {
logutil.BgLogger().Warn("recover index: the constraint of unique index is broken, handle in index is not equal to handle in table",
zap.String("index", e.index.Meta().Name.O), zap.ByteString("indexKey", key),
zap.Stringer("handleInTable", rows[i].handle), zap.Stringer("handleInIndex", handle))
zap.Stringer("handleInTable", rows[rowIdx[i]].handle), zap.Stringer("handleInIndex", handle))
}
}
rows[i].skip = true
}
rows[rowIdx[i]].skip = found && rows[rowIdx[i]].skip
}
return nil
}
Expand Down
43 changes: 43 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,49 @@ func TestAdminRecoverIndex(t *testing.T) {
tk.MustExec("admin recover index admin_test i1;")
}

func TestAdminRecoverMVIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))")
tk.MustExec("insert into t values (0, '[0,1,2]')")
tk.MustExec("insert into t values (1, '[1,2,3]')")
tk.MustExec("insert into t values (2, '[2,3,4]')")
tk.MustExec("insert into t values (3, '[3,4,5]')")
tk.MustExec("insert into t values (4, '[4,5,6]')")
tk.MustExec("admin check table t")

ctx := mock.NewContext()
ctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("t")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
tk.Session().GetSessionVars().IndexLookupSize = 3
tk.Session().GetSessionVars().MaxChunkSize = 3

cpIdx := idxInfo.Clone()
cpIdx.MVIndex = false
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx)

txn, err := store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(2), kv.IntHandle(1))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
r := tk.MustQuery("admin recover index t idx")
r.Check(testkit.Rows("1 5"))
tk.MustExec("admin check table t")
}

func TestAdminCleanupMVIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

Expand Down