From e1041014d35ce8bb861548c63e3ec13f415095ef Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 24 Dec 2020 15:21:57 +0800 Subject: [PATCH] fix issue related to '_tidb_rowid' and move column count to tidb encoder --- lightning/backend/tidb.go | 36 +++++++++++-- lightning/backend/tidb_test.go | 20 +++++-- lightning/restore/restore.go | 21 +++----- lightning/restore/restore_test.go | 17 ++++-- tests/tidb_rowid/run.sh | 88 ++++++++++++++++--------------- 5 files changed, 111 insertions(+), 71 deletions(-) diff --git a/lightning/backend/tidb.go b/lightning/backend/tidb.go index 32c2d9ac7..6c1e0793d 100644 --- a/lightning/backend/tidb.go +++ b/lightning/backend/tidb.go @@ -38,6 +38,14 @@ import ( "github.com/pingcap/tidb-lightning/lightning/verification" ) +var ( + extraHandleTableColumn = &table.Column{ + ColumnInfo: extraHandleColumnInfo, + GeneratedExpr: nil, + DefaultExpr: nil, + } +) + type tidbRow string type tidbRows []tidbRow @@ -51,10 +59,13 @@ func (row tidbRows) MarshalLogArray(encoder zapcore.ArrayEncoder) error { } type tidbEncoder struct { - mode mysql.SQLMode - tbl table.Table - se *session + mode mysql.SQLMode + tbl table.Table + se *session + // the index of table columns for each data field. + // index == len(table.columns) means this field is `_tidb_rowid` columnIdx []int + columnCnt int } type tidbBackend struct { @@ -228,17 +239,34 @@ func (enc *tidbEncoder) appendSQL(sb *strings.Builder, datum *types.Datum, col * func (*tidbEncoder) Close() {} +func getColumnByIndex(cols []*table.Column, index int) *table.Column { + if index == len(cols) { + return extraHandleTableColumn + } else { + return cols[index] + } +} + func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, columnPermutation []int) (Row, error) { cols := enc.tbl.Cols() if len(enc.columnIdx) == 0 { + columnCount := 0 columnIdx := make([]int, len(columnPermutation)) for i, idx := range columnPermutation { if idx >= 0 { columnIdx[idx] = i + columnCount++ } } enc.columnIdx = columnIdx + enc.columnCnt = columnCount + } + + if len(row) != enc.columnCnt { + log.L().Error("column count mismatch", zap.Ints("column_permutation", columnPermutation), + zap.Array("data", rowArrayMarshaler(row))) + return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row)) } var encoded strings.Builder @@ -248,7 +276,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co if i != 0 { encoded.WriteByte(',') } - if err := enc.appendSQL(&encoded, &field, cols[enc.columnIdx[i]]); err != nil { + if err := enc.appendSQL(&encoded, &field, getColumnByIndex(cols, enc.columnIdx[i])); err != nil { logger.Error("tidb encode failed", zap.Array("original", rowArrayMarshaler(row)), zap.Int("originalCol", i), diff --git a/lightning/backend/tidb_test.go b/lightning/backend/tidb_test.go index f02fda801..48ce6a51c 100644 --- a/lightning/backend/tidb_test.go +++ b/lightning/backend/tidb_test.go @@ -136,12 +136,21 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) err = engine.WriteRows(ctx, []string{"a"}, dataRows) c.Assert(err, IsNil) + + // test encode rows with _tidb_rowid + encoder, err = ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{}) + c.Assert(err, IsNil) + row, err = encoder.Encode(logger, []types.Datum{ + types.NewIntDatum(1), + types.NewIntDatum(1), // _tidb_rowid field + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1}) + c.Assert(err, IsNil) } func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { @@ -165,7 +174,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) @@ -192,14 +201,17 @@ func (s *mysqlSuite) TestStrictMode(c *C) { logger := log.L() _, err = encoder.Encode(logger, []types.Datum{ types.NewStringDatum("test"), - }, 1, []int{0, 1, -1}) + }, 1, []int{0, -1, -1}) c.Assert(err, IsNil) _, err = encoder.Encode(logger, []types.Datum{ types.NewStringDatum("\xff\xff\xff\xff"), - }, 1, []int{0, 1, -1}) + }, 1, []int{0, -1, -1}) c.Assert(err, ErrorMatches, `.*incorrect utf8 value .* for column s0`) + // oepn a new encode because column count changed. + encoder, err = bk.NewEncoder(tbl, &kv.SessionOptions{SQLMode: mysql.ModeStrictAllTables}) + c.Assert(err, IsNil) _, err = encoder.Encode(logger, []types.Datum{ types.NewStringDatum(""), types.NewStringDatum("非 ASCII 字符串"), diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index a631f360c..46f85de3e 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -1675,7 +1675,12 @@ func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string { for _, idx := range colIndexes { // skip columns with index -1 if idx >= 0 { - names = append(names, tableInfo.Columns[idx].Name.O) + // original fiels contains _tidb_rowid field + if idx == len(tableInfo.Columns) { + names = append(names, model.ExtraHandleName.O) + } else { + names = append(names, tableInfo.Columns[idx].Name.O) + } } } return names @@ -1903,7 +1908,6 @@ func (cr *chunkRestore) encodeLoop( pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs initializedColumns, reachEOF := false, false - columnCnt := len(t.tableInfo.Core.Columns) for !reachEOF { if err = pauser.Wait(ctx); err != nil { return @@ -1932,13 +1936,6 @@ func (cr *chunkRestore) encodeLoop( } } initializedColumns = true - // check the private column '_tidb_row_id', - for _, c := range columnNames { - if strings.ToLower(c) == model.ExtraHandleName.L { - columnCnt++ - break - } - } } case io.EOF: reachEOF = true @@ -1950,12 +1947,6 @@ func (cr *chunkRestore) encodeLoop( readDur += time.Since(readDurStart) encodeDurStart := time.Now() lastRow := cr.parser.LastRow() - if columnCnt < len(lastRow.Row) { - logger.Error("row fields is more than table fields", zap.Int("tableFields", columnCnt), - zap.Int("rowFields", len(lastRow.Row)), zap.Int64("position", newOffset), zap.Array("row", lastRow)) - err = errors.Errorf("row field count %d is bigger than table fields count %d", len(lastRow.Row), columnCnt) - return - } // sql -> kv kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation) encodeDur += time.Since(encodeDurStart) diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index f22e21c66..9865f5e7a 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -610,6 +610,11 @@ func (s *tableRestoreSuite) TestGetColumnsNames(c *C) { c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, -1, -1}), DeepEquals, []string{"a", "b"}) c.Assert(getColumnNames(s.tableInfo.Core, []int{1, -1, 0, -1}), DeepEquals, []string{"c", "a"}) c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, -1, -1}), DeepEquals, []string{"b"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 2, 3, 0}), DeepEquals, []string{"_tidb_rowid", "a", "b", "c"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 0, 2, 3}), DeepEquals, []string{"b", "a", "c", "_tidb_rowid"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, 2, 1}), DeepEquals, []string{"b", "_tidb_rowid", "c"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{2, -1, 0, 1}), DeepEquals, []string{"c", "_tidb_rowid", "a"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 1, -1, 0}), DeepEquals, []string{"_tidb_rowid", "b"}) } func (s *tableRestoreSuite) TestInitializeColumns(c *C) { @@ -1031,14 +1036,16 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { kvsCh := make(chan []deliveredKVs, 2) deliverCompleteCh := make(chan deliverResult) - kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{ - SQLMode: s.cfg.TiDB.SQLMode, - Timestamp: 1234567895, - }) + kvEncoder, err := kv.NewTiDBBackend(nil, config.ReplaceOnDup).NewEncoder( + s.tr.encTable, + &kv.SessionOptions{ + SQLMode: s.cfg.TiDB.SQLMode, + Timestamp: 1234567895, + }) c.Assert(err, IsNil) _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) - c.Assert(err, ErrorMatches, "row field count 4 is bigger than table fields count 3") + c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4") c.Assert(kvsCh, HasLen, 0) } diff --git a/tests/tidb_rowid/run.sh b/tests/tidb_rowid/run.sh index 9bc6180d2..d94388abd 100755 --- a/tests/tidb_rowid/run.sh +++ b/tests/tidb_rowid/run.sh @@ -17,50 +17,52 @@ set -eu -run_sql 'DROP DATABASE IF EXISTS rowid;' -run_lightning -echo 'Import finished' +for BACKEND in local importer tidb; do + run_sql 'DROP DATABASE IF EXISTS rowid;' + run_lightning -backend $BACKEND + echo 'Import finished' -run_sql 'SELECT count(*), max(id), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.`non_pk_auto_inc`' -check_contains 'count(*): 22' -check_contains 'max(id): 37' -check_contains 'min(_tidb_rowid): 1' -check_contains 'max(_tidb_rowid): 22' -run_sql 'INSERT INTO rowid.`non_pk_auto_inc` (`pk`) VALUES ("?")' -run_sql 'SELECT id > 37, _tidb_rowid > 22 FROM rowid.`non_pk_auto_inc` WHERE `pk` = "?"' -check_contains 'id > 37: 1' -check_contains '_tidb_rowid > 22: 1' + run_sql 'SELECT count(*), max(id), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.`non_pk_auto_inc`' + check_contains 'count(*): 22' + check_contains 'max(id): 37' + check_contains 'min(_tidb_rowid): 1' + check_contains 'max(_tidb_rowid): 22' + run_sql 'INSERT INTO rowid.`non_pk_auto_inc` (`pk`) VALUES ("?")' + run_sql 'SELECT id > 37, _tidb_rowid > 22 FROM rowid.`non_pk_auto_inc` WHERE `pk` = "?"' + check_contains 'id > 37: 1' + check_contains '_tidb_rowid > 22: 1' -for table_name in non_pk explicit_tidb_rowid; do - run_sql "SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.${table_name}" - check_contains 'count(*): 10' - check_contains 'min(_tidb_rowid): 1' - check_contains 'max(_tidb_rowid): 10' - run_sql "SELECT _tidb_rowid FROM rowid.${table_name} WHERE pk = 'five'" - check_contains '_tidb_rowid: 5' - run_sql "INSERT INTO rowid.${table_name} VALUES ('eleven')" - run_sql "SELECT count(*) FROM rowid.${table_name}" - check_contains 'count(*): 11' - run_sql "SELECT count(*) FROM rowid.${table_name} WHERE pk > '!'" - check_contains 'count(*): 11' - run_sql "SELECT _tidb_rowid > 10 FROM rowid.${table_name} WHERE pk = 'eleven'" - check_contains '_tidb_rowid > 10: 1' -done + for table_name in non_pk explicit_tidb_rowid; do + run_sql "SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.${table_name}" + check_contains 'count(*): 10' + check_contains 'min(_tidb_rowid): 1' + check_contains 'max(_tidb_rowid): 10' + run_sql "SELECT _tidb_rowid FROM rowid.${table_name} WHERE pk = 'five'" + check_contains '_tidb_rowid: 5' + run_sql "INSERT INTO rowid.${table_name} VALUES ('eleven')" + run_sql "SELECT count(*) FROM rowid.${table_name}" + check_contains 'count(*): 11' + run_sql "SELECT count(*) FROM rowid.${table_name} WHERE pk > '!'" + check_contains 'count(*): 11' + run_sql "SELECT _tidb_rowid > 10 FROM rowid.${table_name} WHERE pk = 'eleven'" + check_contains '_tidb_rowid > 10: 1' + done -run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase' -check_contains 'count(*): 1' -check_contains 'min(_tidb_rowid): 1' -check_contains 'max(_tidb_rowid): 1' -run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")' -run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"' -check_contains '_tidb_rowid > 70000: 1' + run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase' + check_contains 'count(*): 1' + check_contains 'min(_tidb_rowid): 1' + check_contains 'max(_tidb_rowid): 1' + run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")' + run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"' + check_contains '_tidb_rowid > 70000: 1' -run_sql 'SELECT count(*) FROM rowid.specific_auto_inc' -check_contains 'count(*): 5' -run_sql 'INSERT INTO rowid.specific_auto_inc (a) VALUES ("ffffff"), ("gggggg")' -run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "ffffff"' -check_contains '_tidb_rowid > 80000: 1' -check_contains 'b > 80000: 1' -run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "gggggg"' -check_contains '_tidb_rowid > 80000: 1' -check_contains 'b > 80000: 1' + run_sql 'SELECT count(*) FROM rowid.specific_auto_inc' + check_contains 'count(*): 5' + run_sql 'INSERT INTO rowid.specific_auto_inc (a) VALUES ("ffffff"), ("gggggg")' + run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "ffffff"' + check_contains '_tidb_rowid > 80000: 1' + check_contains 'b > 80000: 1' + run_sql 'SELECT _tidb_rowid > 80000, b > 80000 FROM rowid.specific_auto_inc WHERE a = "gggggg"' + check_contains '_tidb_rowid > 80000: 1' + check_contains 'b > 80000: 1' +done