Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer/: Fix when pk is handle and value overflow int64 #574

Merged
merged 1 commit into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion drainer/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,15 @@ func insertRowToDatums(table *model.TableInfo, row []byte) (pk types.Datum, datu

for _, col := range table.Columns {
if IsPKHandleColumn(table, col) {
datums[col.ID] = pk
// If pk is handle, the datums TiDB write will always be Int64 type.
// https://github.com/pingcap/tidb/blob/cd10bca6660937beb5d6de11d49ec50e149fe083/table/tables/tables.go#L721
//
// create table pk(id BIGINT UNSIGNED);
// insert into pk(id) values(18446744073709551615)
//
// Will get -1 here, note: uint64(int64(-1)) = 18446744073709551615
// so we change it to uint64 if the column type is unsigned
datums[col.ID] = fixType(pk, col)
}
}

Expand All @@ -128,3 +136,14 @@ func getDefaultOrZeroValue(col *model.ColumnInfo) types.Datum {

return table.GetZeroValue(col)
}

func fixType(data types.Datum, col *model.ColumnInfo) types.Datum {
if mysql.HasUnsignedFlag(col.Flag) {
switch oldV := data.GetValue().(type) {
case int64:
log.Debugf("convert int64 type to uint64, value: %d", oldV)
return types.NewDatum(uint64(oldV))
}
}
return data
}
144 changes: 110 additions & 34 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dailytest
import (
"database/sql"
"fmt"
"math"
"math/rand"
"strings"
"time"
Expand Down Expand Up @@ -106,46 +107,44 @@ var case3Clean = []string{`
drop table a`,
}

// RunCase run some simple test case
func RunCase(src *sql.DB, dst *sql.DB, schema string) {
RunTest(src, dst, schema, func(src *sql.DB) {
err := execSQLs(src, case1)
if err != nil {
log.Fatal(err)
}
})
type testRunner struct {
src *sql.DB
dst *sql.DB
schema string
}

// clean table
RunTest(src, dst, schema, func(src *sql.DB) {
err := execSQLs(src, case1Clean)
if err != nil {
log.Fatal(err)
}
})
func (tr *testRunner) run(test func(*sql.DB)) {
RunTest(tr.src, tr.dst, tr.schema, test)
}

// run case2
RunTest(src, dst, schema, func(src *sql.DB) {
err := execSQLs(src, case2)
func (tr *testRunner) execSQLs(sqls []string) {
RunTest(tr.src, tr.dst, tr.schema, func(src *sql.DB) {
err := execSQLs(tr.src, sqls)
if err != nil {
log.Fatal(err)
}
})
}

// clean table
RunTest(src, dst, schema, func(src *sql.DB) {
err := execSQLs(src, case2Clean)
if err != nil {
log.Fatal(err)
}
})
// RunCase run some simple test case
func RunCase(src *sql.DB, dst *sql.DB, schema string) {
tr := &testRunner{src: src, dst: dst, schema: schema}
runPKcases(tr)

tr.execSQLs(case1)
tr.execSQLs(case1Clean)

tr.execSQLs(case2)
tr.execSQLs(case2Clean)

// run case3
RunTest(src, dst, schema, func(src *sql.DB) {
tr.run(func(src *sql.DB) {
err := execSQLs(src, case3)
if err != nil && !strings.Contains(err.Error(), "Duplicate for key") {
log.Fatal(err)
}
})
tr.execSQLs(case3Clean)

// random op on have both pk and uk table
RunTest(src, dst, schema, func(src *sql.DB) {
Expand All @@ -159,14 +158,6 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) {
log.Info(" updatePKUK take: ", time.Since(start))
})

// clean table
RunTest(src, dst, schema, func(src *sql.DB) {
err := execSQLs(src, case3Clean)
if err != nil {
log.Fatal(err)
}
})

// swap unique index value
RunTest(src, dst, schema, func(src *sql.DB) {
_, err := src.Exec("create table uindex(id int primary key, a1 int unique)")
Expand Down Expand Up @@ -343,3 +334,88 @@ func updatePKUK(db *sql.DB, opNum int) error {

return nil
}

// create a table with one column id with different type
// test the case whether it is primary key too, this can
// also help test when the column is handle or not.
func runPKcases(tr *testRunner) {
cases := []struct {
Tp string
Value interface{}
Update interface{}
}{
{
Tp: "BIGINT UNSIGNED",
Value: uint64(math.MaxUint64),
Update: uint64(math.MaxUint64) - 1,
},
{
Tp: "BIGINT SIGNED",
Value: int64(math.MaxInt64),
Update: int64(math.MaxInt64) - 1,
},
{
Tp: "INT UNSIGNED",
Value: uint32(math.MaxUint32),
Update: uint32(math.MaxUint32) - 1,
},
{
Tp: "INT SIGNED",
Value: int32(math.MaxInt32),
Update: int32(math.MaxInt32) - 1,
},
{
Tp: "SMALLINT UNSIGNED",
Value: uint16(math.MaxUint16),
Update: uint16(math.MaxUint16) - 1,
},
{
Tp: "SMALLINT SIGNED",
Value: int16(math.MaxInt16),
Update: int16(math.MaxInt16) - 1,
},
{
Tp: "TINYINT UNSIGNED",
Value: uint8(math.MaxUint8),
Update: uint8(math.MaxUint8) - 1,
},
{
Tp: "TINYINT SIGNED",
Value: int8(math.MaxInt8),
Update: int8(math.MaxInt8) - 1,
},
}

for _, c := range cases {
for _, ispk := range []string{"", "PRIMARY KEY"} {

tr.run(func(src *sql.DB) {
sql := fmt.Sprintf("CREATE TABLE pk(id %s %s)", c.Tp, ispk)
mustExec(src, sql)

sql = "INSERT INTO pk(id) values( ? )"
mustExec(src, sql, c.Value)

if len(ispk) == 0 {
// insert a null value
mustExec(src, sql, nil)
}

sql = "UPDATE pk set id = ? where id = ?"
mustExec(src, sql, c.Update, c.Value)

sql = "DELETE from pk where id = ?"
mustExec(src, sql, c.Update)
})

tr.execSQLs([]string{"DROP TABLE pk"})
}
}
}

func mustExec(db *sql.DB, sql string, args ...interface{}) {
_, err := db.Exec(sql, args...)
if err != nil {
log.Fatalf("exec failed, sql: %s args: %v, err: %+v", sql, args, err)
}
}