Skip to content

Commit

Permalink
executor: implement part of restrictive LOAD DATA (#41793)
Browse files Browse the repository at this point in the history
close #41784
  • Loading branch information
lance6716 authored Mar 3, 2023
1 parent 04fefc9 commit 1d293d8
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 83 deletions.
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
if err != nil {
return value, err
}
if err := col.CheckNotNull(&value); err == nil {
if err := col.CheckNotNull(&value, 0); err == nil {
return value, nil // the most normal case
}
isBadNullValue = true
Expand All @@ -505,7 +505,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx, 0)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
Expand Down
15 changes: 15 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,21 @@ error = '''
Subquery returns more than 1 row
'''

["executor:1261"]
error = '''
Row %d doesn't contain data for all columns
'''

["executor:1262"]
error = '''
Row %d was truncated; it contained more data than there were input columns
'''

["executor:1263"]
error = '''
Column set to default value; NULL supplied to NOT NULL column '%s' at row %d
'''

["executor:1295"]
error = '''
This command is not supported in the prepared statement protocol yet
Expand Down
2 changes: 2 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var (
errTruncateWrongInsertValue = dbterror.ClassTable.NewStdErr(mysql.ErrTruncatedWrongValue, parser_mysql.Message("Incorrect %-.32s value: '%-.128s' for column '%.192s' at row %d", nil))
ErrExistsInHistoryPassword = dbterror.ClassExecutor.NewStd(mysql.ErrExistsInHistoryPassword)

ErrWarnTooFewRecords = dbterror.ClassExecutor.NewStd(mysql.ErrWarnTooFewRecords)
ErrWarnTooManyRecords = dbterror.ClassExecutor.NewStd(mysql.ErrWarnTooManyRecords)
ErrLoadDataFromServerDisk = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataFromServerDisk)
ErrLoadParquetFromLocal = dbterror.ClassExecutor.NewStd(mysql.ErrLoadParquetFromLocal)
ErrLoadDataEmptyPath = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataEmptyPath)
Expand Down
5 changes: 0 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,11 +2152,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.TruncateAsWarning = !vars.StrictSQLMode
case *ast.LoadDataStmt:
sc.DupKeyAsWarning = true
sc.BadNullAsWarning = true
// With IGNORE or LOCAL, data-interpretation errors become warnings and the load operation continues,
// even if the SQL mode is restrictive. For details: https://dev.mysql.com/doc/refman/8.0/en/load-data.html
// TODO: since TiDB only support the LOCAL by now, so the TruncateAsWarning are always true here.
sc.TruncateAsWarning = true
sc.InLoadDataStmt = true
// return warning instead of error when load data meet no partition for value
sc.IgnoreNoPartition = true
Expand Down
9 changes: 7 additions & 2 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,11 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
col.ColumnInfo.Offset = len(tCols)
tCols = append(tCols, col)
}
rowCntInLoadData := uint64(0)
if e.ctx.GetSessionVars().StmtCtx.InLoadDataStmt {
rowCntInLoadData = e.rowCount
}

for i, c := range tCols {
var err error
// Evaluate the generated columns later after real columns set
Expand All @@ -668,7 +673,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
return nil, err
}
if !e.lazyFillAutoID || (e.lazyFillAutoID && !mysql.HasAutoIncrementFlag(c.GetFlag())) {
if err = c.HandleBadNull(&row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
if err = c.HandleBadNull(&row[i], e.ctx.GetSessionVars().StmtCtx, rowCntInLoadData); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -705,7 +710,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
return nil, err
}
// Handle the bad null error.
if err = gCol.HandleBadNull(&row[colIdx], e.ctx.GetSessionVars().StmtCtx); err != nil {
if err = gCol.HandleBadNull(&row[colIdx], e.ctx.GetSessionVars().StmtCtx, rowCntInLoadData); err != nil {
return nil, err
}
}
Expand Down
58 changes: 44 additions & 14 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,11 @@ func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) err
// rowCount will be used in fillRow(), last insert ID will be assigned according to the rowCount = 1.
// So should add first here.
e.rowCount++
e.rows = append(e.rows, e.colsToRow(ctx, parser.LastRow().Row))
r, err := e.parserData2TableData(ctx, parser.LastRow().Row)
if err != nil {
return err
}
e.rows = append(e.rows, r)
e.curBatchCnt++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
Expand All @@ -856,8 +860,33 @@ func (e *LoadDataWorker) ReadRows(ctx context.Context, parser mydump.Parser) err
}
}

// colsToRow encodes the data of parser output.
func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []types.Datum {
// parserData2TableData encodes the data of parser output.
func (e *LoadDataWorker) parserData2TableData(
ctx context.Context,
parserData []types.Datum,
) ([]types.Datum, error) {
// Data interpretation is restrictive if the SQL mode is restrictive and neither
// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
// operation.
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
restrictive := e.Ctx.GetSessionVars().SQLMode.HasStrictMode() &&
e.onDuplicate != ast.OnDuplicateKeyHandlingIgnore

var errColNumMismatch error
switch {
case len(parserData) < len(e.fieldMappings):
errColNumMismatch = ErrWarnTooFewRecords.GenWithStackByArgs(e.rowCount)
case len(parserData) > len(e.fieldMappings):
errColNumMismatch = ErrWarnTooManyRecords.GenWithStackByArgs(e.rowCount)
}

if errColNumMismatch != nil {
if restrictive {
return nil, errColNumMismatch
}
e.handleWarning(errColNumMismatch)
}

row := make([]types.Datum, 0, len(e.insertColumns))
sessionVars := e.Ctx.GetSessionVars()
setVar := func(name string, col *types.Datum) {
Expand All @@ -872,7 +901,7 @@ func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []ty
}

for i := 0; i < len(e.fieldMappings); i++ {
if i >= len(cols) {
if i >= len(parserData) {
if e.fieldMappings[i].Column == nil {
setVar(e.fieldMappings[i].UserVar.Name, nil)
continue
Expand All @@ -889,35 +918,36 @@ func (e *LoadDataWorker) colsToRow(ctx context.Context, cols []types.Datum) []ty
}

if e.fieldMappings[i].Column == nil {
setVar(e.fieldMappings[i].UserVar.Name, &cols[i])
setVar(e.fieldMappings[i].UserVar.Name, &parserData[i])
continue
}

if cols[i].IsNull() {
row = append(row, types.NewDatum(nil))
continue
}

row = append(row, cols[i])
row = append(row, parserData[i])
}
for i := 0; i < len(e.columnAssignments); i++ {
// eval expression of `SET` clause
d, err := expression.EvalAstExpr(e.Ctx, e.columnAssignments[i].Expr)
if err != nil {
if restrictive {
return nil, err
}
e.handleWarning(err)
return nil
return nil, nil
}
row = append(row, d)
}

// a new row buffer will be allocated in getRow
newRow, err := e.getRow(ctx, row)
if err != nil {
if restrictive {
return nil, err
}
e.handleWarning(err)
return nil
return nil, nil
}

return newRow
return newRow, nil
}

// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that
Expand Down
150 changes: 138 additions & 12 deletions executor/loadremotetest/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -21,6 +21,7 @@ import (
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,18 +85,143 @@ func (s *mockGCSSuite) TestErrorMessage() {
`ERROR 8162 (HY000): STARTING BY '
' cannot contain LINES TERMINATED BY '
'`)
}

// TODO: fix these tests
//s.tk.MustExec("CREATE TABLE t2 (c1 INT, c2 INT, c3 INT);")
//err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s'
// INTO TABLE t2;`, gcsEndpoint))
//checkClientErrorMessage(s.T(), err,
// "ERROR 1261 (01000): Row 1 doesn't contain data for all columns")
//s.tk.MustExec("CREATE TABLE t3 (c1 INT);")
//err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t.tsv?endpoint=%s'
// INTO TABLE t3;`, gcsEndpoint))
//checkClientErrorMessage(s.T(), err,
// "ERROR 1262 (01000): Row 1 was truncated; it contained more data than there were input columns")
func (s *mockGCSSuite) TestColumnNumMismatch() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;")

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "t2.tsv",
},
Content: []byte("1\t2\n" +
"1\t4\n"),
})

s.tk.MustExec("CREATE DATABASE load_csv;")
s.tk.MustExec("USE load_csv;")

// table has fewer columns than data

s.tk.MustExec("CREATE TABLE t (c INT);")
s.tk.MustExec("SET SESSION sql_mode = ''")
err := s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s'
INTO TABLE t;`, gcsEndpoint))
require.NoError(s.T(), err)
require.Equal(s.T(), "Records: 2 Deleted: 0 Skipped: 0 Warnings: 2", s.tk.Session().LastMessage())
s.tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows(
"Warning 1262 Row 1 was truncated; it contained more data than there were input columns",
"Warning 1262 Row 2 was truncated; it contained more data than there were input columns"))

s.tk.MustExec("SET SESSION sql_mode = 'ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'")
err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s'
INTO TABLE t;`, gcsEndpoint))
checkClientErrorMessage(s.T(), err,
"ERROR 1262 (01000): Row 1 was truncated; it contained more data than there were input columns")

err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s'
REPLACE INTO TABLE t;`, gcsEndpoint))
checkClientErrorMessage(s.T(), err,
"ERROR 1262 (01000): Row 1 was truncated; it contained more data than there were input columns")

err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s'
IGNORE INTO TABLE t;`, gcsEndpoint))
require.NoError(s.T(), err)
require.Equal(s.T(), "Records: 2 Deleted: 0 Skipped: 0 Warnings: 2", s.tk.Session().LastMessage())
s.tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows(
"Warning 1262 Row 1 was truncated; it contained more data than there were input columns",
"Warning 1262 Row 2 was truncated; it contained more data than there were input columns"))

// table has more columns than data

s.tk.MustExec("CREATE TABLE t2 (c1 INT, c2 INT, c3 INT);")
err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s'
INTO TABLE t2;`, gcsEndpoint))
checkClientErrorMessage(s.T(), err,
"ERROR 1261 (01000): Row 1 doesn't contain data for all columns")

// fill default value for missing columns

s.tk.MustExec(`CREATE TABLE t3 (
c1 INT NOT NULL,
c2 INT NOT NULL,
c3 INT NOT NULL DEFAULT 1);`)
s.tk.MustExec(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t2.tsv?endpoint=%s'
INTO TABLE t3 (c1, c2);`, gcsEndpoint))
s.tk.MustQuery("SELECT * FROM t3;").Check(testkit.Rows(
"1 2 1",
"1 4 1"))
}

func (s *mockGCSSuite) TestEvalError() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;")

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "t3.tsv",
},
Content: []byte("1\t2\n" +
"1\t4\n"),
})

s.tk.MustExec("CREATE DATABASE load_csv;")
s.tk.MustExec("USE load_csv;")

s.tk.MustExec("CREATE TABLE t (c INT);")
s.tk.MustExec("SET SESSION sql_mode = 'ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'")
err := s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t3.tsv?endpoint=%s'
INTO TABLE t (@v1, @) SET c=@v1+'asd';`, gcsEndpoint))
checkClientErrorMessage(s.T(), err,
"ERROR 1292 (22007): Truncated incorrect DOUBLE value: 'asd'")

// REPLACE does not help

err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t3.tsv?endpoint=%s'
REPLACE INTO TABLE t (@v1, @) SET c=@v1+'asd';`, gcsEndpoint))
checkClientErrorMessage(s.T(), err,
"ERROR 1292 (22007): Truncated incorrect DOUBLE value: 'asd'")

// IGNORE helps

err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/t3.tsv?endpoint=%s'
IGNORE INTO TABLE t (@v1, @) SET c=@v1+'asd';`, gcsEndpoint))
require.NoError(s.T(), err)
require.Equal(s.T(), "Records: 2 Deleted: 0 Skipped: 0 Warnings: 2", s.tk.Session().LastMessage())
s.tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows(
"Warning 1292 Truncated incorrect DOUBLE value: 'asd'",
"Warning 1292 Truncated incorrect DOUBLE value: 'asd'"))
}

func (s *mockGCSSuite) TestDataError() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_csv;")

s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-tsv",
Name: "null.tsv",
},
Content: []byte("1\t\\N\n" +
"1\t4\n"),
})

s.tk.MustExec("CREATE DATABASE load_csv;")
s.tk.MustExec("USE load_csv;")

s.tk.MustExec("CREATE TABLE t (c INT NOT NULL, c2 INT NOT NULL);")
s.tk.MustExec("SET SESSION sql_mode = 'ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'")
err := s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/null.tsv?endpoint=%s'
INTO TABLE t;`, gcsEndpoint))
checkClientErrorMessage(s.T(), err,
"ERROR 1263 (22004): Column set to default value; NULL supplied to NOT NULL column 'c2' at row 1")

err = s.tk.ExecToErr(fmt.Sprintf(`LOAD DATA INFILE 'gs://test-tsv/null.tsv?endpoint=%s'
IGNORE INTO TABLE t;`, gcsEndpoint))
require.NoError(s.T(), err)
require.Equal(s.T(), "Records: 2 Deleted: 0 Skipped: 0 Warnings: 1", s.tk.Session().LastMessage())
s.tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows(
"Warning 1263 Column set to default value; NULL supplied to NOT NULL column 'c2' at row 1"))

// TODO: don't use batchCheckAndInsert, mimic (*InsertExec).exec()

Expand Down
2 changes: 1 addition & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
// Handle the bad null error.
for i, col := range t.Cols() {
var err error
if err = col.HandleBadNull(&newData[i], sc); err != nil {
if err = col.HandleBadNull(&newData[i], sc, 0); err != nil {
return false, err
}
}
Expand Down
Loading

0 comments on commit 1d293d8

Please sign in to comment.