Skip to content

Commit

Permalink
txn:codec part for amend transaction with ddl (#19659)
Browse files Browse the repository at this point in the history
* move codec logic into tablecodec package

* tmp

Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
  • Loading branch information
cfzjywxk and ti-srebot authored Sep 1, 2020
1 parent 3db1233 commit d890a8d
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 155 deletions.
4 changes: 2 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
for i, key := range w.batchCheckKeys {
if val, found := batchVals[string(key)]; found {
if w.distinctCheckFlags[i] {
handle, err1 := tables.DecodeHandle(val)
handle, err1 := tablecodec.DecodeHandle(val)
if err1 != nil {
return errors.Trace(err1)
}
Expand All @@ -1003,7 +1003,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
if w.distinctCheckFlags[i] {
batchVals[string(key)] = tables.EncodeHandle(idxRecords[i].handle)
batchVals[string(key)] = tablecodec.EncodeHandle(idxRecords[i].handle)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
for i, key := range e.batchKeys {
if val, found := values[string(key)]; found {
if distinctFlags[i] {
handle, err1 := tables.DecodeHandle(val)
handle, err1 := tablecodec.DecodeHandle(val)
if err1 != nil {
return err1
}
Expand Down
3 changes: 1 addition & 2 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -208,7 +207,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if len(handleVal) == 0 {
continue
}
handle, err1 := tables.DecodeHandle(handleVal)
handle, err1 := tablecodec.DecodeHandle(handleVal)
if err1 != nil {
return err1
}
Expand Down
4 changes: 2 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -841,7 +841,7 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
for i, col := range w.idxTblCols {
vals = append(vals, row.GetDatum(i, &col.FieldType))
}
vals = tables.TruncateIndexValuesIfNeeded(tblInfo, w.idxLookup.index, vals)
vals = tablecodec.TruncateIndexValuesIfNeeded(tblInfo, w.idxLookup.index, vals)
for i, val := range vals {
col := w.idxTblCols[i]
tp := &col.FieldType
Expand Down
5 changes: 2 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -134,7 +133,7 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
handle, err := tablecodec.DecodeHandle(val)
if err != nil {
return err
}
Expand Down Expand Up @@ -217,7 +216,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
return err
}
handle, err := tables.DecodeHandle(val)
handle, err := tablecodec.DecodeHandle(val)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -196,7 +195,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return e.lockKeyIfNeeded(ctx, e.idxKey)
}
e.handle, err = tables.DecodeHandle(e.handleVal)
e.handle, err = tablecodec.DecodeHandle(e.handleVal)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -537,7 +536,7 @@ func (s *testPointGetSuite) TestReturnValues(c *C) {
txnCtx := tk.Se.GetSessionVars().TxnCtx
val, ok := txnCtx.GetKeyInPessimisticLockCache(pk)
c.Assert(ok, IsTrue)
handle, err := tables.DecodeHandle(val)
handle, err := tablecodec.DecodeHandle(val)
c.Assert(err, IsNil)
rowKey := tablecodec.EncodeRowKeyWithHandle(tid, handle)
_, ok = txnCtx.GetKeyInPessimisticLockCache(rowKey)
Expand Down
3 changes: 1 addition & 2 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -155,7 +154,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r
return false, false, err
}

handle, err := tables.DecodeHandle(val)
handle, err := tablecodec.DecodeHandle(val)
if err != nil {
return false, true, err
}
Expand Down
150 changes: 11 additions & 139 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
package tables

import (
"bytes"
"context"
"encoding/binary"
"io"
"unicode/utf8"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
Expand All @@ -32,26 +28,8 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/rowcodec"
)

// EncodeHandle encodes handle in data.
func EncodeHandle(h int64) []byte {
var data [8]byte
binary.BigEndian.PutUint64(data[:], uint64(h))
return data[:]
}

// DecodeHandle decodes handle in data.
func DecodeHandle(data []byte) (int64, error) {
dLen := len(data)
if dLen <= tablecodec.MaxOldEncodeValueLen {
return int64(binary.BigEndian.Uint64(data)), nil
}
return int64(binary.BigEndian.Uint64(data[dLen-int(data[0]):])), nil
}

// indexIter is for KV store index iterator.
type indexIter struct {
it kv.Iterator
Expand Down Expand Up @@ -86,7 +64,7 @@ func (c *indexIter) Next() (val []types.Datum, h int64, err error) {
val = vv[0 : len(vv)-1]
} else {
// If the index is unique and the value isn't nil, the handle is in value.
h, err = DecodeHandle(c.it.Value())
h, err = tablecodec.DecodeHandle(c.it.Value())
if err != nil {
return nil, 0, err
}
Expand All @@ -106,6 +84,7 @@ type index struct {
tblInfo *model.TableInfo
prefix kv.Key
containNonBinaryString bool
phyTblID int64
}

func (c *index) checkContainNonBinaryString() bool {
Expand All @@ -124,7 +103,8 @@ func NewIndex(physicalID int64, tblInfo *model.TableInfo, indexInfo *model.Index
idxInfo: indexInfo,
tblInfo: tblInfo,
// The prefix can't encode from tblInfo.ID, because table partition may change the id to partition id.
prefix: tablecodec.EncodeTableIndexPrefix(physicalID, indexInfo.ID),
prefix: tablecodec.EncodeTableIndexPrefix(physicalID, indexInfo.ID),
phyTblID: physicalID,
}
index.containNonBinaryString = index.checkContainNonBinaryString()
return index
Expand All @@ -135,77 +115,10 @@ func (c *index) Meta() *model.IndexInfo {
return c.idxInfo
}

func (c *index) getIndexKeyBuf(buf []byte, defaultCap int) []byte {
if buf != nil {
return buf[:0]
}

return make([]byte, 0, defaultCap)
}

// TruncateIndexValuesIfNeeded truncates the index values created using only the leading part of column values.
func TruncateIndexValuesIfNeeded(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, indexedValues []types.Datum) []types.Datum {
for i := 0; i < len(indexedValues); i++ {
v := &indexedValues[i]
if v.Kind() == types.KindString || v.Kind() == types.KindBytes {
ic := idxInfo.Columns[i]
colCharset := tblInfo.Columns[ic.Offset].Charset
colValue := v.GetBytes()
isUTF8Charset := colCharset == charset.CharsetUTF8 || colCharset == charset.CharsetUTF8MB4
origKind := v.Kind()
if isUTF8Charset {
if ic.Length != types.UnspecifiedLength && utf8.RuneCount(colValue) > ic.Length {
rs := bytes.Runes(colValue)
truncateStr := string(rs[:ic.Length])
// truncate value and limit its length
v.SetString(truncateStr, tblInfo.Columns[ic.Offset].Collate)
if origKind == types.KindBytes {
v.SetBytes(v.GetBytes())
}
}
} else if ic.Length != types.UnspecifiedLength && len(colValue) > ic.Length {
// truncate value and limit its length
v.SetBytes(colValue[:ic.Length])
if origKind == types.KindString {
v.SetString(v.GetString(), tblInfo.Columns[ic.Offset].Collate)
}
}
}
}

return indexedValues
}

// GenIndexKey generates storage key for index values. Returned distinct indicates whether the
// indexed values should be distinct in storage (i.e. whether handle is encoded in the key).
func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h int64, buf []byte) (key []byte, distinct bool, err error) {
if c.idxInfo.Unique {
// See https://dev.mysql.com/doc/refman/5.7/en/create-index.html
// A UNIQUE index creates a constraint such that all values in the index must be distinct.
// An error occurs if you try to add a new row with a key value that matches an existing row.
// For all engines, a UNIQUE index permits multiple NULL values for columns that can contain NULL.
distinct = true
for _, cv := range indexedValues {
if cv.IsNull() {
distinct = false
break
}
}
}

// For string columns, indexes can be created using only the leading part of column values,
// using col_name(length) syntax to specify an index prefix length.
indexedValues = TruncateIndexValuesIfNeeded(c.tblInfo, c.idxInfo, indexedValues)
key = c.getIndexKeyBuf(buf, len(c.prefix)+len(indexedValues)*9+9)
key = append(key, []byte(c.prefix)...)
key, err = codec.EncodeKey(sc, key, indexedValues...)
if !distinct && err == nil {
key, err = codec.EncodeKey(sc, key, types.NewDatum(h))
}
if err != nil {
return nil, false, err
}
return
return tablecodec.GenIndexKey(sc, c.tblInfo, c.idxInfo, c.phyTblID, indexedValues, h, buf)
}

// Create creates a new entry in the kvIndex data.
Expand Down Expand Up @@ -302,51 +215,10 @@ func (c *index) Create(sctx sessionctx.Context, rm kv.RetrieverMutator, indexedV
// save the key buffer to reuse.
writeBufs.IndexKeyBuf = key
var idxVal []byte
if collate.NewCollationEnabled() && c.containNonBinaryString {
colIds := make([]int64, len(c.idxInfo.Columns))
for i, col := range c.idxInfo.Columns {
colIds[i] = c.tblInfo.Columns[col.Offset].ID
}
rd := rowcodec.Encoder{Enable: true}
rowRestoredValue, err := rd.Encode(sctx.GetSessionVars().StmtCtx, colIds, indexedValues, nil)
if err != nil {
return 0, err
}
idxVal = make([]byte, 1+len(rowRestoredValue))
copy(idxVal[1:], rowRestoredValue)
tailLen := 0
if distinct {
// The len of the idxVal is always >= 10 since len (restoredValue) > 0.
tailLen += 8
idxVal = append(idxVal, EncodeHandle(h)...)
} else if len(idxVal) < 10 {
// Padding the len to 10
paddingLen := 10 - len(idxVal)
tailLen += paddingLen
idxVal = append(idxVal, bytes.Repeat([]byte{0x0}, paddingLen)...)
}
if opt.Untouched {
// If index is untouched and fetch here means the key is exists in TiKV, but not in txn mem-buffer,
// then should also write the untouched index key/value to mem-buffer to make sure the data
// is consistent with the index in txn mem-buffer.
tailLen += 1
idxVal = append(idxVal, kv.UnCommitIndexKVFlag)
}
idxVal[0] = byte(tailLen)
} else {
idxVal = make([]byte, 0)
if distinct {
idxVal = EncodeHandle(h)
}
if opt.Untouched {
// If index is untouched and fetch here means the key is exists in TiKV, but not in txn mem-buffer,
// then should also write the untouched index key/value to mem-buffer to make sure the data
// is consistent with the index in txn mem-buffer.
idxVal = append(idxVal, kv.UnCommitIndexKVFlag)
}
if len(idxVal) == 0 {
idxVal = []byte{'0'}
}
idxVal, err = tablecodec.GenIndexValue(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo,
c.containNonBinaryString, distinct, opt.Untouched, indexedValues, h)
if err != nil {
return 0, err
}

if !distinct || skipCheck || opt.Untouched {
Expand All @@ -373,7 +245,7 @@ func (c *index) Create(sctx sessionctx.Context, rm kv.RetrieverMutator, indexedV
return 0, err
}

handle, err := DecodeHandle(value)
handle, err := tablecodec.DecodeHandle(value)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -461,7 +333,7 @@ func (c *index) Exist(sc *stmtctx.StatementContext, rm kv.RetrieverMutator, inde

// For distinct index, the value of key is handle.
if distinct {
handle, err := DecodeHandle(value)
handle, err := tablecodec.DecodeHandle(value)
if err != nil {
return false, 0, err
}
Expand Down
Loading

0 comments on commit d890a8d

Please sign in to comment.