Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: mark the writes from delete-only and drop them on merge (#39796) #39799

Merged
merged 1 commit into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
job.Args = []interface{}{indexInfo.ID, false /*if exists*/, getPartitionIDs(tbl.Meta())}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", tblInfo.State)
}
Expand Down Expand Up @@ -960,6 +963,9 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.Args[0] = indexInfo.ID
} else {
// the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility,
Expand Down
5 changes: 3 additions & 2 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
unique := false
length := len(rawValue)
keyVer := rawValue[length-1]
if keyVer == tables.TempIndexKeyTypeMerge {
// The kv is written in the merging state. It has been written to the origin index, we can skip it.
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
rawValue = rawValue[:length-1]
Expand Down
45 changes: 45 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,48 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) {
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10"))
}

func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec(`CREATE TABLE t (a DATE NULL DEFAULT '1619-01-18', b BOOL NULL DEFAULT '0') CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_bin';`)
tk.MustExec(`INSERT INTO t SET b = '1';`)

updateSQLs := []string{
"UPDATE t SET a = '9432-05-10', b = '0';",
"UPDATE t SET a = '9432-05-10', b = '1';",
}

// Force onCreateIndex use the txn-merge process.
ingest.LitInitialized = false
tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;")
tk.MustExec("set @@global.tidb_enable_mutation_checker = 1;")
tk.MustExec("set @@global.tidb_txn_assertion_level = 'STRICT';")

var checkErrs []error
originHook := dom.DDL().GetHook()
callback := &ddl.TestDDLCallback{
Do: dom,
}
onJobUpdatedBefore := func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
for _, sql := range updateSQLs {
_, err := tk2.Exec(sql)
if err != nil {
checkErrs = append(checkErrs, err)
}
}
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedBefore)
dom.DDL().SetHook(callback)
tk.MustExec("alter table t add index idx(b);")
dom.DDL().SetHook(originHook)
for _, err := range checkErrs {
require.NoError(t, err)
}
tk.MustExec("admin check table t;")
}
7 changes: 6 additions & 1 deletion table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
)
if !opt.FromBackFill {
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill {
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
key, tempKey = tempKey, nil
keyIsTempIdxKey = true
}
Expand Down Expand Up @@ -347,6 +347,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
TempIndexKeyTypeDelete byte = 'd'
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
TempIndexKeyTypeBackfill byte = 'b'
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
Expand All @@ -363,6 +365,9 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
case model.BackfillStateRunning:
// Write to the temporary index.
tablecodec.IndexKey2TempIndexKey(indexInfo.ID, indexKey)
if indexInfo.State == model.StateDeleteOnly {
return nil, indexKey, TempIndexKeyTypeDelete
}
return nil, indexKey, TempIndexKeyTypeBackfill
case model.BackfillStateReadyToMerge, model.BackfillStateMerging:
// Double write
Expand Down