Skip to content

Commit

Permalink
*: fix wrong replace or insert-on-dup behavior on prefixed clustered …
Browse files Browse the repository at this point in the history
…index (#23091)
  • Loading branch information
lysu authored Mar 9, 2021
1 parent 83e70f7 commit b8f6ac0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 71 deletions.
47 changes: 36 additions & 11 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/stringutil"
)

Expand Down Expand Up @@ -79,30 +81,37 @@ func getKeysNeedCheck(ctx context.Context, sctx sessionctx.Context, t table.Tabl
}
toBeCheckRows := make([]toBeCheckedRow, 0, len(rows))

var handleCols []*table.Column
var (
tblHandleCols []*table.Column
pkIdxInfo *model.IndexInfo
)
// Get handle column if PK is handle.
if t.Meta().PKIsHandle {
for _, col := range t.Cols() {
if col.IsPKHandleColumn(t.Meta()) {
handleCols = append(handleCols, col)
tblHandleCols = append(tblHandleCols, col)
break
}
}
} else {
handleCols = tables.TryGetCommonPkColumns(t)
} else if t.Meta().IsCommonHandle {
pkIdxInfo = tables.FindPrimaryIndex(t.Meta())
for _, idxCol := range pkIdxInfo.Columns {
tblHandleCols = append(tblHandleCols, t.Cols()[idxCol.Offset])
}
}

var err error
for _, row := range rows {
toBeCheckRows, err = getKeysNeedCheckOneRow(sctx, t, row, nUnique, handleCols, toBeCheckRows)
toBeCheckRows, err = getKeysNeedCheckOneRow(sctx, t, row, nUnique, tblHandleCols, pkIdxInfo, toBeCheckRows)
if err != nil {
return nil, err
}
}
return toBeCheckRows, nil
}

func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.Datum, nUnique int, handleCols []*table.Column, result []toBeCheckedRow) ([]toBeCheckedRow, error) {
func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.Datum, nUnique int, handleCols []*table.Column,
pkIdxInfo *model.IndexInfo, result []toBeCheckedRow) ([]toBeCheckedRow, error) {
var err error
if p, ok := t.(table.PartitionedTable); ok {
t, err = p.GetPartitionByRow(ctx, row)
Expand All @@ -121,11 +130,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
var handle kv.Handle
if t.Meta().IsCommonHandle {
var err error
handleOrdinals := make([]int, 0, len(handleCols))
for _, col := range handleCols {
handleOrdinals = append(handleOrdinals, col.Offset)
}
handle, err = kv.BuildHandleFromDatumRow(ctx.GetSessionVars().StmtCtx, row, handleOrdinals)
handle, err = buildHandleFromDatumRow(ctx.GetSessionVars().StmtCtx, row, handleCols, pkIdxInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -213,6 +218,26 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
return result, nil
}

func buildHandleFromDatumRow(sctx *stmtctx.StatementContext, row []types.Datum, tblHandleCols []*table.Column, pkIdxInfo *model.IndexInfo) (kv.Handle, error) {
pkDts := make([]types.Datum, 0, len(tblHandleCols))
for i, col := range tblHandleCols {
d := row[col.Offset]
if pkIdxInfo != nil && len(pkIdxInfo.Columns) > 0 {
tablecodec.TruncateIndexValue(&d, pkIdxInfo.Columns[i], col.ColumnInfo)
}
pkDts = append(pkDts, d)
}
handleBytes, err := codec.EncodeKey(sctx, nil, pkDts...)
if err != nil {
return nil, err
}
handle, err := kv.NewCommonHandle(handleBytes)
if err != nil {
return nil, err
}
return handle, nil
}

func formatDataForDupError(data []types.Datum) (string, error) {
strs := make([]string, 0, len(data))
for _, datum := range data {
Expand Down
18 changes: 0 additions & 18 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strconv"
"strings"

"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)
Expand Down Expand Up @@ -413,23 +412,6 @@ func (m *HandleMap) Range(fn func(h Handle, val interface{}) bool) {
}
}

// BuildHandleFromDatumRow builds kv.Handle from cols in row.
func BuildHandleFromDatumRow(sctx *stmtctx.StatementContext, row []types.Datum, handleOrdinals []int) (Handle, error) {
pkDts := make([]types.Datum, 0, len(handleOrdinals))
for _, ordinal := range handleOrdinals {
pkDts = append(pkDts, row[ordinal])
}
handleBytes, err := codec.EncodeKey(sctx, nil, pkDts...)
if err != nil {
return nil, err
}
handle, err := NewCommonHandle(handleBytes)
if err != nil {
return nil, err
}
return handle, nil
}

// PartitionHandle combines a handle and a PartitionID, used to location a row in partioned table.
// Now only used in global index.
// TODO: support PartitionHandle in HandleMap.
Expand Down
37 changes: 37 additions & 0 deletions session/clustered_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,43 @@ func (s *testClusteredSuite) TestClusteredPrefixColumn(c *C) {
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(tt).Sort().Check(testkit.Rows(output[i].Res...))
}

tk.MustExec("drop table if exists test1")
tk.MustExec("create table test1(c1 varchar(100) not null default 'xyza', c2 int, primary key(c1(3)) clustered)")
tk.MustExec("replace into test1(c2) values(1)")
tk.MustExec("replace into test1(c2) values(2)")
tk.MustQuery("select * from test1").Check(testkit.Rows("xyza 2"))

tk.MustExec("drop table if exists test3")
tk.MustExec("create table test3(c1 varchar(100), c2 int, primary key(c1(1)) clustered)")
tk.MustExec("insert into test3 values('ab', 1) on duplicate key update c2 = 100")
tk.MustExec("insert into test3 values('ab', 1) on duplicate key update c2 = 100")
tk.MustQuery("select * from test3").Check(testkit.Rows("ab 100"))
tk.MustExec("insert into test3 values('ab', 1) on duplicate key update c1 = 'cc', c2 = '200'")
tk.MustQuery("select * from test3").Check(testkit.Rows("cc 200"))

tk.MustExec("drop table if exists tbl_3")
tk.MustExec(`create table tbl_3 ( col_15 text(138) , col_16 varchar(37) default 'yGdboyZqIGDQhwRRc' not null , col_17 text(39) not null , col_18 char(58) default 'vBahOai' , col_19 varchar(470) , primary key idx_12 ( col_16(3),col_17(6),col_15(4)) clustered, key idx_13 ( col_19(2) ) , key idx_14 ( col_18(3),col_15(2) ) , unique key idx_15 ( col_16(4),col_18(6) ) , unique key idx_16 ( col_17(1) ) )`)
tk.MustExec("insert into tbl_3 values ( 'XJUDeSZplXx','TfZhIWnJPygn','HlZjQffSh','VDsepqNPkx','xqtMHHOqnLvcxDpL')")
tk.MustExec("insert into tbl_3 (col_15,col_17,col_19) values ( 'aeMrIjbfCxErg','HTZmtykzIkFMF','' ) on duplicate key update col_18 = values( col_18 )")
tk.MustQuery("select col_17 from tbl_3").Check(testkit.Rows("HlZjQffSh"))

tk.MustExec("drop table if exists tbl_1")
tk.MustExec("CREATE TABLE `tbl_1`(`col_5` char(84) NOT NULL DEFAULT 'BnHWZQY', `col_6` char(138) DEFAULT NULL, `col_7` tinytext NOT NULL, `col_8` char(231) DEFAULT NULL, `col_9` varchar(393) NOT NULL DEFAULT 'lizgVQd', PRIMARY KEY (`col_5`(4),`col_7`(3)) clustered , KEY `idx_2` (`col_5`(6),`col_8`(5)), UNIQUE KEY `idx_3` (`col_7`(2)), UNIQUE KEY `idx_4` (`col_9`(6),`col_7`(4),`col_6`(3)), UNIQUE KEY `idx_5` (`col_9`(3)) );")
tk.MustExec("insert into tbl_1 values('BsXhVuVvPRcSOlkzuM','QXIEA','IHeTDzJJyfOhIOY','ddxnmRcIjVfosRVC','lizgVQd')")
tk.MustExec("replace into tbl_1 (col_6,col_7,col_8) values ( 'WzdD','S','UrQhNEUZy' )")
tk.MustExec("admin check table tbl_1")

tk.MustExec("drop table if exists tbl_3")
tk.MustExec("create table tbl_3 ( col_15 char(167) not null , col_16 varchar(56) not null , col_17 text(25) not null , col_18 char , col_19 char(12) not null , primary key idx_21 ( col_16(5) ) clustered, key idx_22 ( col_19(2),col_16(4) ) , unique key idx_23 ( col_19(6),col_16(4) ) , unique key idx_24 ( col_19(1),col_18(1) ) , key idx_25 ( col_17(3),col_16(2),col_19(4) ) , key idx_26 ( col_18(1),col_17(3) ) , key idx_27 ( col_18(1) ) , unique key idx_28 ( col_16(4),col_15(3) ) , unique key idx_29 ( col_16(2) ) , key idx_30 ( col_18(1),col_16(2),col_19(4),col_17(6) ) , key idx_31 ( col_19(2) ) , key idx_32 ( col_16(6) ) , unique key idx_33 ( col_18(1) ) , unique key idx_34 ( col_15(4) ) , key idx_35 ( col_19(6) ) , key idx_36 ( col_19(4),col_17(4),col_18(1) ) )")
tk.MustExec("insert into tbl_3 values('auZELjkOUG','yhFUdsZphsWDFG','mNbCXHOWlIMQvXhY',' ','NpQwmX');")
tk.MustExec("insert into tbl_3 (col_15,col_16,col_17,col_18,col_19) values ( 'PboEJsnVPBknRhpEC','PwqzUThyDHhxhXAdJ','szolY','','pzZfZeOa' ) on duplicate key update col_16 = values( col_16 ) , col_19 = 'zgLlCUA'")
tk.MustExec("admin check table tbl_3")

tk.MustExec("create table t (c_int int, c_str varchar(40), primary key(c_str(8)) clustered, unique key(c_int), key(c_str))")
tk.MustExec("insert into t values (1, 'determined varahamihira')")
tk.MustExec("insert into t values (1, 'pensive mendeleev') on duplicate key update c_int=values(c_int), c_str=values(c_str)")
tk.MustExec("admin check table t")
}

func (s *testClusteredSuite) TestClusteredUnionScanIndexLookup(c *C) {
Expand Down
46 changes: 28 additions & 18 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,11 +878,24 @@ func RowWithCols(t table.Table, ctx sessionctx.Context, h kv.Handle, cols []*tab
return v, nil
}

func containFullColInHandle(meta *model.TableInfo, col *table.Column) (containFullCol bool, idxInHandle int) {
pkIdx := FindPrimaryIndex(meta)
for i, idxCol := range pkIdx.Columns {
if meta.Columns[idxCol.Offset].ID == col.ID {
idxInHandle = i
containFullCol = idxCol.Length == types.UnspecifiedLength
return
}
}
return
}

// DecodeRawRowData decodes raw row data into a datum slice and a (columnID:columnValue) map.
func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h kv.Handle, cols []*table.Column,
value []byte) ([]types.Datum, map[int64]types.Datum, error) {
v := make([]types.Datum, len(cols))
colTps := make(map[int64]*types.FieldType, len(cols))
prefixCols := make(map[int64]struct{})
for i, col := range cols {
if col == nil {
continue
Expand All @@ -896,25 +909,20 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h kv.Handle
continue
}
if col.IsCommonHandleColumn(meta) && !types.CommonHandleNeedRestoredData(&col.FieldType) {
pkIdx := FindPrimaryIndex(meta)
var idxOfIdx int
for i, idxCol := range pkIdx.Columns {
if meta.Columns[idxCol.Offset].ID == col.ID {
idxOfIdx = i
break
if containFullCol, idxInHandle := containFullColInHandle(meta, col); containFullCol {
dtBytes := h.EncodedCol(idxInHandle)
_, dt, err := codec.DecodeOne(dtBytes)
if err != nil {
return nil, nil, err
}
dt, err = tablecodec.Unflatten(dt, &col.FieldType, ctx.GetSessionVars().Location())
if err != nil {
return nil, nil, err
}
v[i] = dt
continue
}
dtBytes := h.EncodedCol(idxOfIdx)
_, dt, err := codec.DecodeOne(dtBytes)
if err != nil {
return nil, nil, err
}
dt, err = tablecodec.Unflatten(dt, &col.FieldType, ctx.GetSessionVars().Location())
if err != nil {
return nil, nil, err
}
v[i] = dt
continue
prefixCols[col.ID] = struct{}{}
}
colTps[col.ID] = &col.FieldType
}
Expand All @@ -928,7 +936,9 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h kv.Handle
continue
}
if col.IsPKHandleColumn(meta) || (col.IsCommonHandleColumn(meta) && !types.CommonHandleNeedRestoredData(&col.FieldType)) {
continue
if _, isPrefix := prefixCols[col.ID]; !isPrefix {
continue
}
}
ri, ok := rowMap[col.ID]
if ok {
Expand Down
51 changes: 27 additions & 24 deletions tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,32 +1274,35 @@ func genIndexValueVersion0(sc *stmtctx.StatementContext, tblInfo *model.TableInf
// TruncateIndexValues truncates the index values created using only the leading part of column values.
func TruncateIndexValues(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, indexedValues []types.Datum) {
for i := 0; i < len(indexedValues); i++ {
v := &indexedValues[i]
idxCol := idxInfo.Columns[i]
noPrefixIndex := idxCol.Length == types.UnspecifiedLength
if noPrefixIndex {
continue
}
notStringType := v.Kind() != types.KindString && v.Kind() != types.KindBytes
if notStringType {
continue
}
tblCol := tblInfo.Columns[idxCol.Offset]
TruncateIndexValue(&indexedValues[i], idxCol, tblCol)
}
}

colInfo := tblInfo.Columns[idxCol.Offset]
isUTF8Charset := colInfo.Charset == charset.CharsetUTF8 || colInfo.Charset == charset.CharsetUTF8MB4
if isUTF8Charset && utf8.RuneCount(v.GetBytes()) > idxCol.Length {
rs := bytes.Runes(v.GetBytes())
truncateStr := string(rs[:idxCol.Length])
// truncate value and limit its length
v.SetString(truncateStr, colInfo.Collate)
if v.Kind() == types.KindBytes {
v.SetBytes(v.GetBytes())
}
} else if !isUTF8Charset && len(v.GetBytes()) > idxCol.Length {
v.SetBytes(v.GetBytes()[:idxCol.Length])
if v.Kind() == types.KindString {
v.SetString(v.GetString(), colInfo.Collate)
}
// TruncateIndexValue truncate one value in the index.
func TruncateIndexValue(v *types.Datum, idxCol *model.IndexColumn, tblCol *model.ColumnInfo) {
noPrefixIndex := idxCol.Length == types.UnspecifiedLength
if noPrefixIndex {
return
}
notStringType := v.Kind() != types.KindString && v.Kind() != types.KindBytes
if notStringType {
return
}
isUTF8Charset := tblCol.Charset == charset.CharsetUTF8 || tblCol.Charset == charset.CharsetUTF8MB4
if isUTF8Charset && utf8.RuneCount(v.GetBytes()) > idxCol.Length {
rs := bytes.Runes(v.GetBytes())
truncateStr := string(rs[:idxCol.Length])
// truncate value and limit its length
v.SetString(truncateStr, tblCol.Collate)
if v.Kind() == types.KindBytes {
v.SetBytes(v.GetBytes())
}
} else if !isUTF8Charset && len(v.GetBytes()) > idxCol.Length {
v.SetBytes(v.GetBytes()[:idxCol.Length])
if v.Kind() == types.KindString {
v.SetString(v.GetString(), tblCol.Collate)
}
}
}
Expand Down

0 comments on commit b8f6ac0

Please sign in to comment.