Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: check row value count to avoid unexpected encode result #528

Merged
merged 16 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import (
"github.com/pingcap/tidb-lightning/lightning/verification"
)

var (
extraHandleTableColumn = &table.Column{
ColumnInfo: extraHandleColumnInfo,
GeneratedExpr: nil,
DefaultExpr: nil,
}
)

type tidbRow string

type tidbRows []tidbRow
Expand All @@ -51,10 +59,13 @@ func (row tidbRows) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
}

type tidbEncoder struct {
mode mysql.SQLMode
tbl table.Table
se *session
mode mysql.SQLMode
tbl table.Table
se *session
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
columnCnt int
}

type tidbBackend struct {
Expand Down Expand Up @@ -228,17 +239,37 @@ func (enc *tidbEncoder) appendSQL(sb *strings.Builder, datum *types.Datum, col *

func (*tidbEncoder) Close() {}

func getColumnByIndex(cols []*table.Column, index int) *table.Column {
if index == len(cols) {
return extraHandleTableColumn
} else {
return cols[index]
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else {
return cols[index]
}
return cols[index]

strange that it is not linted

}

func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, columnPermutation []int) (Row, error) {
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnIdx := make([]int, len(columnPermutation))
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
log.L().Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.L().Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),

zap.Array("data", rowArrayMarshaler(row)))
return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

var encoded strings.Builder
Expand All @@ -248,7 +279,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
if i != 0 {
encoded.WriteByte(',')
}
if err := enc.appendSQL(&encoded, &field, cols[enc.columnIdx[i]]); err != nil {
if err := enc.appendSQL(&encoded, &field, getColumnByIndex(cols, enc.columnIdx[i])); err != nil {
logger.Error("tidb encode failed",
zap.Array("original", rowArrayMarshaler(row)),
zap.Int("originalCol", i),
Expand Down
16 changes: 14 additions & 2 deletions lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {

err = engine.WriteRows(ctx, []string{"a"}, dataRows)
c.Assert(err, IsNil)

// test encode rows with _tidb_rowid
encoder, err = ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
c.Assert(err, IsNil)
row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
types.NewIntDatum(1), // _tidb_rowid field
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1})
c.Assert(err, IsNil)
}

func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
Expand Down Expand Up @@ -192,14 +201,17 @@ func (s *mysqlSuite) TestStrictMode(c *C) {
logger := log.L()
_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("test"),
}, 1, []int{0, 1, -1})
}, 1, []int{0, -1, -1})
c.Assert(err, IsNil)

_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("\xff\xff\xff\xff"),
}, 1, []int{0, 1, -1})
}, 1, []int{0, -1, -1})
c.Assert(err, ErrorMatches, `.*incorrect utf8 value .* for column s0`)

// oepn a new encode because column count changed.
encoder, err = bk.NewEncoder(tbl, &kv.SessionOptions{SQLMode: mysql.ModeStrictAllTables})
c.Assert(err, IsNil)
_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum(""),
types.NewStringDatum("非 ASCII 字符串"),
Expand Down
9 changes: 9 additions & 0 deletions lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
Expand Down Expand Up @@ -95,6 +96,14 @@ type Row struct {
Row []types.Datum
}

// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, r := range row.Row {
encoder.AppendString(r.String())
}
return nil
}

type backslashEscapeFlavor uint8

const (
Expand Down
7 changes: 6 additions & 1 deletion lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1675,7 +1675,12 @@ func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string {
for _, idx := range colIndexes {
// skip columns with index -1
if idx >= 0 {
names = append(names, tableInfo.Columns[idx].Name.O)
// original fiels contains _tidb_rowid field
if idx == len(tableInfo.Columns) {
names = append(names, model.ExtraHandleName.O)
} else {
names = append(names, tableInfo.Columns[idx].Name.O)
}
}
}
return names
Expand Down
42 changes: 42 additions & 0 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,11 @@ func (s *tableRestoreSuite) TestGetColumnsNames(c *C) {
c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, -1, -1}), DeepEquals, []string{"a", "b"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, -1, 0, -1}), DeepEquals, []string{"c", "a"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, -1, -1}), DeepEquals, []string{"b"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 2, 3, 0}), DeepEquals, []string{"_tidb_rowid", "a", "b", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 0, 2, 3}), DeepEquals, []string{"b", "a", "c", "_tidb_rowid"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, 2, 1}), DeepEquals, []string{"b", "_tidb_rowid", "c"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{2, -1, 0, 1}), DeepEquals, []string{"c", "_tidb_rowid", "a"})
c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 1, -1, 0}), DeepEquals, []string{"_tidb_rowid", "b"})
}

func (s *tableRestoreSuite) TestInitializeColumns(c *C) {
Expand Down Expand Up @@ -1007,6 +1012,43 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) {
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) {
dir := c.MkDir()
fileName := "db.table.000.csv"
err := ioutil.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3,4\r\n4,5,6,7\r\n"), 0644)
c.Assert(err, IsNil)

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

ctx := context.Background()
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}

reader, err := store.Open(ctx, fileName)
c.Assert(err, IsNil)
w := worker.NewPool(ctx, 5, "io")
p := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, false)

err = s.cr.parser.Close()
c.Assert(err, IsNil)
s.cr.parser = p

kvsCh := make(chan []deliveredKVs, 2)
deliverCompleteCh := make(chan deliverResult)
kvEncoder, err := kv.NewTiDBBackend(nil, config.ReplaceOnDup).NewEncoder(
s.tr.encTable,
&kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
})
c.Assert(err, IsNil)

_, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4")
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestRestore(c *C) {
ctx := context.Background()

Expand Down
2 changes: 2 additions & 0 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
// allow use _tidb_rowid in sql statement
"tidb_opt_write_row_id": "1",
}
)

Expand Down
88 changes: 45 additions & 43 deletions tests/tidb_rowid/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,52 @@

set -eu

run_sql 'DROP DATABASE IF EXISTS rowid;'
run_lightning
echo 'Import finished'
for BACKEND in local importer tidb; do
kennytm marked this conversation as resolved.
Show resolved Hide resolved
run_sql 'DROP DATABASE IF EXISTS rowid;'
run_lightning -backend $BACKEND
echo 'Import finished'

run_sql 'SELECT count(*), max(id), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.`non_pk_auto_inc`'
check_contains 'count(*): 22'
check_contains 'max(id): 37'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 22'
run_sql 'INSERT INTO rowid.`non_pk_auto_inc` (`pk`) VALUES ("?")'
run_sql 'SELECT id > 37, _tidb_rowid > 22 FROM rowid.`non_pk_auto_inc` WHERE `pk` = "?"'
check_contains 'id > 37: 1'
check_contains '_tidb_rowid > 22: 1'
run_sql 'SELECT count(*), max(id), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.`non_pk_auto_inc`'
check_contains 'count(*): 22'
check_contains 'max(id): 37'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 22'
run_sql 'INSERT INTO rowid.`non_pk_auto_inc` (`pk`) VALUES ("?")'
run_sql 'SELECT id > 37, _tidb_rowid > 22 FROM rowid.`non_pk_auto_inc` WHERE `pk` = "?"'
check_contains 'id > 37: 1'
check_contains '_tidb_rowid > 22: 1'

for table_name in non_pk explicit_tidb_rowid; do
run_sql "SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.${table_name}"
check_contains 'count(*): 10'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 10'
run_sql "SELECT _tidb_rowid FROM rowid.${table_name} WHERE pk = 'five'"
check_contains '_tidb_rowid: 5'
run_sql "INSERT INTO rowid.${table_name} VALUES ('eleven')"
run_sql "SELECT count(*) FROM rowid.${table_name}"
check_contains 'count(*): 11'
run_sql "SELECT count(*) FROM rowid.${table_name} WHERE pk > '!'"
check_contains 'count(*): 11'
run_sql "SELECT _tidb_rowid > 10 FROM rowid.${table_name} WHERE pk = 'eleven'"
check_contains '_tidb_rowid > 10: 1'
done
for table_name in non_pk explicit_tidb_rowid; do
run_sql "SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.${table_name}"
check_contains 'count(*): 10'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 10'
run_sql "SELECT _tidb_rowid FROM rowid.${table_name} WHERE pk = 'five'"
check_contains '_tidb_rowid: 5'
run_sql "INSERT INTO rowid.${table_name} VALUES ('eleven')"
run_sql "SELECT count(*) FROM rowid.${table_name}"
check_contains 'count(*): 11'
run_sql "SELECT count(*) FROM rowid.${table_name} WHERE pk > '!'"
check_contains 'count(*): 11'
run_sql "SELECT _tidb_rowid > 10 FROM rowid.${table_name} WHERE pk = 'eleven'"
check_contains '_tidb_rowid > 10: 1'
done

run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase'
check_contains 'count(*): 1'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 1'
run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")'
run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"'
check_contains '_tidb_rowid > 70000: 1'
run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase'
check_contains 'count(*): 1'
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 1'
run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")'
run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"'
check_contains '_tidb_rowid > 70000: 1'

run_sql 'SELECT count(*) FROM rowid.specific_auto_inc'
check_contains 'count(*): 5'
run_sql 'INSERT INTO rowid.specific_auto_inc (a) VALUES ("ffffff"), ("gggggg")'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "ffffff"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "gggggg"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
run_sql 'SELECT count(*) FROM rowid.specific_auto_inc'
check_contains 'count(*): 5'
run_sql 'INSERT INTO rowid.specific_auto_inc (a) VALUES ("ffffff"), ("gggggg")'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "ffffff"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "gggggg"'
check_contains '_tidb_rowid > 80000: 1'
check_contains 'b > 80000: 1'
done