Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: lock row keys during merging back from temp index #39936

Merged
merged 15 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type temporaryIndexRecord struct {
delete bool
unique bool
distinct bool
rowKey kv.Key
}

type mergeIndexWorker struct {
Expand Down Expand Up @@ -133,6 +134,14 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
if idxRecord.skip {
continue
}

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.rowKey)
if err != nil {
return errors.Trace(err)
}

if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
Expand All @@ -149,6 +158,7 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC
}
return nil
})

logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
Expand All @@ -166,6 +176,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
oprStartTime := startTime
idxPrefix := w.table.IndexPrefix()
var lastKey kv.Key
isCommonHandle := w.table.Meta().IsCommonHandle
err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) {
oprEndTime := time.Now()
Expand All @@ -182,35 +193,37 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, nil
}

isDelete := false
unique := false
length := len(rawValue)
keyVer := rawValue[length-1]
originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
rawValue = rawValue[:length-1]
if bytes.Equal(rawValue, tables.DeleteMarker) {
isDelete = true
} else if bytes.Equal(rawValue, tables.DeleteMarkerUnique) {
isDelete = true
unique = true

if handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), handle)

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)

idxRecord := &temporaryIndexRecord{
rowKey: rowKey,
delete: isDelete,
unique: unique,
skip: false,
}
if !isDelete {
idxRecord.vals = rawValue
idxRecord.distinct = tablecodec.IndexKVIsUnique(rawValue)
idxRecord.vals = originVal
idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
Expand Down
66 changes: 66 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test

import (
"testing"
"time"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/ingest"
Expand Down Expand Up @@ -376,3 +377,68 @@ func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) {
}
tk.MustExec("admin check table t;")
}

func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec(`CREATE TABLE t (id int primary key, a int);`)
tk.MustExec(`INSERT INTO t VALUES (1, 1);`)

// Force onCreateIndex use the txn-merge process.
ingest.LitInitialized = false
tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;")
tk.MustExec("set @@global.tidb_enable_mutation_checker = 1;")
tk.MustExec("set @@global.tidb_txn_assertion_level = 'STRICT';")
tk.MustExec("set @@global.tidb_enable_metadata_lock = 0;")
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved

originHook := dom.DDL().GetHook()
callback := &ddl.TestDDLCallback{Do: dom}

runPessimisticTxn := false
callback.OnJobRunBeforeExported = func(job *model.Job) {
if t.Failed() {
return
}
if job.SchemaState == model.StateWriteOnly {
// Write a record to the temp index.
_, err := tk2.Exec("update t set a = 2 where id = 1;")
assert.NoError(t, err)
}
if !runPessimisticTxn && job.SchemaState == model.StateWriteReorganization {
idx := findIdxInfo(dom, "test", "t", "idx")
if idx == nil {
return
}
if idx.BackfillState != model.BackfillStateReadyToMerge {
return
}
runPessimisticTxn = true
_, err := tk2.Exec("begin pessimistic;")
assert.NoError(t, err)
_, err = tk2.Exec("update t set a = 3 where id = 1;")
assert.NoError(t, err)
}
}
dom.DDL().SetHook(callback)
afterCommit := make(chan struct{}, 1)
go func() {
tk.MustExec("alter table t add index idx(a);")
afterCommit <- struct{}{}
}()
timer := time.NewTimer(300 * time.Millisecond)
select {
case <-timer.C:
break
case <-afterCommit:
require.Fail(t, "should be blocked by the pessimistic txn")
}
tk2.MustExec("rollback;")
<-afterCommit
dom.DDL().SetHook(originHook)
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2"))

}
5 changes: 1 addition & 4 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package executor

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -267,8 +265,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// Since the temp index stores deleted key with marked 'deleteu' for unique key at the end
// of value, So if return a key we check and skip deleted key.
if tablecodec.IsTempIndexKey(uk.newKey) {
rowVal := val[:len(val)-1]
if bytes.Equal(rowVal, tables.DeleteMarkerUnique) {
if tablecodec.CheckTempIndexValueIsDelete(val) {
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
cloud.google.com/go/storage v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.0
github.com/BurntSushi/toml v1.2.1
Expand Down Expand Up @@ -129,7 +130,6 @@ require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/compute v1.5.0 // indirect
cloud.google.com/go/iam v0.1.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
Expand Down
54 changes: 17 additions & 37 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tables

import (
"bytes"
"context"
"errors"
"sync"
Expand Down Expand Up @@ -179,15 +178,15 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue

if !distinct || skipCheck || opt.Untouched {
if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage.
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
}
err = txn.GetMemBuffer().Set(key, idxVal)
if err != nil {
return nil, err
}
if len(tempKey) > 0 {
if !opt.Untouched { // Untouched key-values never occur in the storage.
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
}
err = txn.GetMemBuffer().Set(tempKey, idxVal)
if err != nil {
Expand Down Expand Up @@ -228,9 +227,9 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}
if err != nil || len(value) == 0 {
lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil
var needPresumeKey tempIndexKeyState
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
var needPresumeKey TempIndexKeyState
if keyIsTempIdxKey {
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle)
if err != nil {
return nil, err
Expand Down Expand Up @@ -260,7 +259,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
return nil, err
}
if len(tempKey) > 0 {
idxVal = append(idxVal, keyVer)
idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer)
if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted {
err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists)
} else {
Expand Down Expand Up @@ -288,13 +287,6 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
return handle, kv.ErrKeyExists
}

var (
// DeleteMarker is a marker that the key is deleted.
DeleteMarker = []byte("delete")
// DeleteMarkerUnique is a marker that the unique index key is deleted.
DeleteMarkerUnique = []byte("deleteu")
)

// Delete removes the entry for handle h and indexedValues from KV index.
func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
Expand All @@ -312,10 +304,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
}
}
if len(tempKey) > 0 {
val := make([]byte, 0, len(DeleteMarkerUnique)+1)
val = append(val, DeleteMarkerUnique...)
val = append(val, tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, val)
tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, tempVal)
if err != nil {
return err
}
Expand All @@ -328,10 +318,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
}
}
if len(tempKey) > 0 {
val := make([]byte, 0, len(DeleteMarker)+1)
val = append(val, DeleteMarker...)
val = append(val, tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, val)
tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer)
err = txn.GetMemBuffer().Set(tempKey, tempVal)
if err != nil {
return err
}
Expand Down Expand Up @@ -508,11 +496,11 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo *
return colInfo
}

type tempIndexKeyState byte
type TempIndexKeyState byte

const (
// KeyInTempIndexUnknown whether the key exists or not in temp index is unknown.
KeyInTempIndexUnknown tempIndexKeyState = iota
KeyInTempIndexUnknown TempIndexKeyState = iota
// KeyInTempIndexNotExist the key is not exist in temp index.
KeyInTempIndexNotExist
// KeyInTempIndexIsDeleted the key is marked deleted in temp index.
Expand All @@ -524,7 +512,7 @@ const (
)

// KeyExistInTempIndex is used to check the unique key exist status in temp index.
func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (tempIndexKeyState, kv.Handle, error) {
func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) {
// Only check temp index key.
if !tablecodec.IsTempIndexKey(key) {
return KeyInTempIndexUnknown, nil, nil
Expand All @@ -541,24 +529,16 @@ func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, di
if len(value) < 1 {
return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1")
}
length := len(value)
// Firstly, we will remove the last byte of key version.
// It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge.
value = value[:length-1]
if distinct {
if bytes.Equal(value, DeleteMarkerUnique) {
return KeyInTempIndexIsDeleted, nil, nil
}
} else {
if bytes.Equal(value, DeleteMarker) {
return KeyInTempIndexIsDeleted, nil, nil
}

if tablecodec.CheckTempIndexValueIsDelete(value) {
return KeyInTempIndexIsDeleted, nil, nil
}

// Check if handle equal.
var handle kv.Handle
if distinct {
handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle)
originVal := tablecodec.DecodeTempIndexOriginValue(value)
handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle)
if err != nil {
return KeyInTempIndexUnknown, nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions table/tables/mutation_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tables

import (
"bytes"
"fmt"
"strings"

Expand Down Expand Up @@ -153,11 +152,11 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in
value []byte
orgKey []byte
indexHandle kv.Handle
err error
)
if idxID != m.indexID {
value = append(value, m.value[:len(m.value)-1]...)
if len(value) == 0 || (bytes.Equal(value, []byte("delete")) || bytes.Equal(value, []byte("deleteu"))) {
value = tablecodec.DecodeTempIndexOriginValue(m.value)
if len(value) == 0 {
// Skip the deleted operation values.
continue
}
orgKey = append(orgKey, m.key...)
Expand Down Expand Up @@ -246,7 +245,7 @@ func checkIndexKeys(
}

// When it is in add index new backfill state.
if len(value) == 0 || (idxID != m.indexID && (bytes.Equal(value, []byte("deleteu")) || bytes.Equal(value, []byte("delete")))) {
if len(value) == 0 || (idxID != m.indexID && (tablecodec.CheckTempIndexValueIsDelete(value))) {
err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta())
} else {
err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta())
Expand Down
Loading