Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer/: Fix when pk is handle and value overflow int64 #573

Merged
merged 2 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion drainer/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
)

var sqlMode mysql.SQLMode
Expand Down Expand Up @@ -63,7 +65,15 @@ func insertRowToDatums(table *model.TableInfo, row []byte) (pk types.Datum, datu

for _, col := range table.Columns {
if IsPKHandleColumn(table, col) {
datums[col.ID] = pk
// If pk is handle, the datums TiDB write will always be Int64 type.
// https://github.com/pingcap/tidb/blob/cd10bca6660937beb5d6de11d49ec50e149fe083/table/tables/tables.go#L721
//
// create table pk(id BIGINT UNSIGNED);
// insert into pk(id) values(18446744073709551615)
//
// Will get -1 here, note: uint64(int64(-1)) = 18446744073709551615
// so we change it to uint64 if the column type is unsigned
datums[col.ID] = fixType(pk, col)
}
}

Expand Down Expand Up @@ -168,3 +178,14 @@ func newUpdateDecoder(table *model.TableInfo) updateDecoder {
func (ud updateDecoder) decode(b []byte, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) {
return DecodeOldAndNewRow(b, ud.colsTypes, loc)
}

func fixType(data types.Datum, col *model.ColumnInfo) types.Datum {
if mysql.HasUnsignedFlag(col.Flag) {
switch oldV := data.GetValue().(type) {
case int64:
log.Debug("convert int64 type to uint64", zap.Int64("value", oldV))
return types.NewDatum(uint64(oldV))
}
}
return data
}
88 changes: 88 additions & 0 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dailytest
import (
"database/sql"
"fmt"
"math"
"math/rand"
"strings"
"time"
Expand Down Expand Up @@ -142,6 +143,8 @@ func (tr *testRunner) execSQLs(sqls []string) {
func RunCase(src *sql.DB, dst *sql.DB, schema string) {
tr := &testRunner{src: src, dst: dst, schema: schema}

runPKcases(tr)

tr.execSQLs(case1)
tr.execSQLs(case1Clean)

Expand Down Expand Up @@ -375,3 +378,88 @@ func updatePKUK(db *sql.DB, opNum int) error {
_, err = db.Exec("DROP TABLE pkuk")
return errors.Trace(err)
}

// create a table with one column id with different type
// test the case whether it is primary key too, this can
// also help test when the column is handle or not.
func runPKcases(tr *testRunner) {
cases := []struct {
Tp string
Value interface{}
Update interface{}
}{
{
Tp: "BIGINT UNSIGNED",
Value: uint64(math.MaxUint64),
Update: uint64(math.MaxUint64) - 1,
},
{
Tp: "BIGINT SIGNED",
Value: int64(math.MaxInt64),
Update: int64(math.MaxInt64) - 1,
},
{
Tp: "INT UNSIGNED",
Value: uint32(math.MaxUint32),
Update: uint32(math.MaxUint32) - 1,
},
{
Tp: "INT SIGNED",
Value: int32(math.MaxInt32),
Update: int32(math.MaxInt32) - 1,
},
{
Tp: "SMALLINT UNSIGNED",
Value: uint16(math.MaxUint16),
Update: uint16(math.MaxUint16) - 1,
},
{
Tp: "SMALLINT SIGNED",
Value: int16(math.MaxInt16),
Update: int16(math.MaxInt16) - 1,
},
{
Tp: "TINYINT UNSIGNED",
Value: uint8(math.MaxUint8),
Update: uint8(math.MaxUint8) - 1,
},
{
Tp: "TINYINT SIGNED",
Value: int8(math.MaxInt8),
Update: int8(math.MaxInt8) - 1,
},
}

for _, c := range cases {
for _, ispk := range []string{"", "PRIMARY KEY"} {

tr.run(func(src *sql.DB) {
sql := fmt.Sprintf("CREATE TABLE pk(id %s %s)", c.Tp, ispk)
mustExec(src, sql)

sql = "INSERT INTO pk(id) values( ? )"
mustExec(src, sql, c.Value)

if len(ispk) == 0 {
// insert a null value
mustExec(src, sql, nil)
}

sql = "UPDATE pk set id = ? where id = ?"
mustExec(src, sql, c.Update, c.Value)

sql = "DELETE from pk where id = ?"
mustExec(src, sql, c.Update)
})

tr.execSQLs([]string{"DROP TABLE pk"})
}
}
}

func mustExec(db *sql.DB, sql string, args ...interface{}) {
_, err := db.Exec(sql, args...)
if err != nil {
log.S().Fatalf("exec failed, sql: %s args: %v, err: %+v", sql, args, err)
}
}