Skip to content

Commit

Permalink
table: fix a bug in TryGetHandleRestoredDataWrapper which may cause p…
Browse files Browse the repository at this point in the history
…anic (#23806) (#23817)
  • Loading branch information
ti-srebot authored Apr 2, 2021
1 parent 26c33d9 commit e2740f5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
20 changes: 20 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8978,3 +8978,23 @@ func (s *testIntegrationSerialSuite) TestCollationPrefixClusteredIndex(c *C) {
tk.MustQuery("select * from t use index(idx);").Check(testkit.Rows("01233 1"))
tk.MustExec("admin check table t;")
}

func (s *testIntegrationSerialSuite) TestIssue23805(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
collate.SetNewCollationEnabledForTest(true)
defer collate.SetNewCollationEnabledForTest(false)

tk.MustExec("CREATE TABLE `tbl_5` (" +
" `col_25` time NOT NULL DEFAULT '05:35:58'," +
" `col_26` blob NOT NULL," +
" `col_27` double NOT NULL," +
" `col_28` char(83) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL," +
" `col_29` timestamp NOT NULL," +
" `col_30` varchar(36) COLLATE utf8mb4_general_ci NOT NULL DEFAULT 'ywzIn'," +
" `col_31` binary(85) DEFAULT 'OIstcXsGmAyc'," +
" `col_32` datetime NOT NULL DEFAULT '2024-08-02 00:00:00'," +
" PRIMARY KEY (`col_26`(3),`col_27`) /*T![clustered_index] CLUSTERED */," +
" UNIQUE KEY `idx_10` (`col_26`(5)));")
tk.MustExec("insert ignore into tbl_5 set col_28 = 'ZmZIdSnq' , col_25 = '18:50:52.00' on duplicate key update col_26 = 'y';\n")
}
77 changes: 36 additions & 41 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,59 +1699,54 @@ func TryGetHandleRestoredDataWrapper(t table.Table, row []types.Datum, rowMap ma
if !collate.NewCollationEnabled() || !t.Meta().IsCommonHandle || t.Meta().CommonHandleVersion == 0 {
return nil
}

useIDMap := false
if len(rowMap) > 0 {
useIDMap = true
}

var datum types.Datum
rsData := make([]types.Datum, 0, 4)
pkCols := TryGetCommonPkColumns(t)
for _, col := range pkCols {
if !types.NeedRestoredData(&col.FieldType) {
pkIdx := FindPrimaryIndex(t.Meta())
for _, pkIdxCol := range pkIdx.Columns {
pkCol := t.Meta().Columns[pkIdxCol.Offset]
if !types.NeedRestoredData(&pkCol.FieldType) {
continue
}
if collate.IsBinCollation(col.Collate) {
if useIDMap {
datum = rowMap[col.ID]
} else {
datum = row[col.Offset]
}
rsData = append(rsData, types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString())))
var datum types.Datum
if len(rowMap) > 0 {
datum = rowMap[pkCol.ID]
} else {
if useIDMap {
rsData = append(rsData, rowMap[col.ID])
} else {
rsData = append(rsData, row[col.Offset])
}
}
}

// Try to truncate index values.
// Says that primary key(a (8)),
// For index t(a), don't truncate the value.
// For index t(a(9)), truncate to a(9).
// For index t(a(7)), truncate to a(8).
pkIdx := FindPrimaryIndex(t.Meta())
for i, pkCol := range pkIdx.Columns {
datum = row[pkCol.Offset]
}
// Try to truncate index values.
// Says that primary key(a (8)),
// For index t(a), don't truncate the value.
// For index t(a(9)), truncate to a(9).
// For index t(a(7)), truncate to a(8).
truncateTargetCol := pkIdxCol
for _, idxCol := range idx.Columns {
if idxCol.Offset == pkCol.Offset {
if idxCol.Length == types.UnspecifiedLength || pkCol.Length == types.UnspecifiedLength {
break
}
useIdx := idxCol
if pkCol.Length > idxCol.Length {
useIdx = pkCol
}
tablecodec.TruncateIndexValue(&rsData[i], useIdx, t.Meta().Columns[idxCol.Offset])
truncateTargetCol = maxIndexLen(pkIdxCol, idxCol)
break
}
}
tablecodec.TruncateIndexValue(&datum, truncateTargetCol, pkCol)
if collate.IsBinCollation(pkCol.Collate) {
rsData = append(rsData, types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString())))
} else {
rsData = append(rsData, datum)
}
}

return rsData
}

func maxIndexLen(idxA, idxB *model.IndexColumn) *model.IndexColumn {
if idxA.Length == types.UnspecifiedLength {
return idxA
}
if idxB.Length == types.UnspecifiedLength {
return idxB
}
if idxA.Length > idxB.Length {
return idxA
}
return idxB
}

func getSequenceAllocator(allocs autoid.Allocators) (autoid.Allocator, error) {
for _, alloc := range allocs {
if alloc.GetType() == autoid.SequenceType {
Expand Down

0 comments on commit e2740f5

Please sign in to comment.