Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
check row value count to avoid unexpected encode result
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Dec 18, 2020
1 parent 56bc32d commit 9b63146
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 0 deletions.
9 changes: 9 additions & 0 deletions lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
Expand Down Expand Up @@ -95,6 +96,14 @@ type Row struct {
Row []types.Datum
}

// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (row Row) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, r := range row.Row {
encoder.AppendString(r.String())
}
return nil
}

type backslashEscapeFlavor uint8

const (
Expand Down
7 changes: 7 additions & 0 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,7 @@ func (cr *chunkRestore) encodeLoop(

pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
columnCnt := len(t.tableInfo.Core.Columns)
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
Expand Down Expand Up @@ -1898,6 +1899,12 @@ func (cr *chunkRestore) encodeLoop(
readDur += time.Since(readDurStart)
encodeDurStart := time.Now()
lastRow := cr.parser.LastRow()
if columnCnt < len(lastRow.Row) {
log.L().Error("row fields is more than table fields", zap.Int("tableFields", columnCnt),
zap.Int("rowFields", len(lastRow.Row)), zap.Array("row", lastRow))
err = errors.Errorf("row field count %d is bigger than table fields count %d", len(lastRow.Row), columnCnt)
return
}
// sql -> kv
kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation)
encodeDur += time.Since(encodeDurStart)
Expand Down
35 changes: 35 additions & 0 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,41 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) {
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) {
dir := c.MkDir()
fileName := "db.table.000.csv"
err := ioutil.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3,4\r\n4,5,6,7\r\n"), 0644)
c.Assert(err, IsNil)

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)

ctx := context.Background()
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}

reader, err := store.Open(ctx, fileName)
c.Assert(err, IsNil)
w := worker.NewPool(ctx, 5, "io")
p := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, false)

err = s.cr.parser.Close()
c.Assert(err, IsNil)
s.cr.parser = p

kvsCh := make(chan []deliveredKVs, 2)
deliverCompleteCh := make(chan deliverResult)
kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
})
c.Assert(err, IsNil)

_, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(err, ErrorMatches, "row field count 4 is bigger than table fields count 3")
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestRestore(c *C) {
ctx := context.Background()

Expand Down

0 comments on commit 9b63146

Please sign in to comment.