Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement NULL DEFINED BY for LOAD DATA #41541

Merged
merged 4 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,13 @@ func (parser *CSVParser) unescapeString(input field) (unescaped string, isNull b
if len(parser.escapedBy) > 0 {
unescaped = unescape(unescaped, "", parser.escFlavor, parser.escapedBy[0], parser.unescapeRegexp)
}
if len(parser.quote) == 0 || !parser.quotedNullIsText {
isNull = parser.escFlavor != escapeFlavorMySQLWithNull &&
!parser.cfg.NotNull &&
slices.Contains(parser.cfg.Null, unescaped)
} else if !input.quoted {
// quoted string can never be NULL except for \N, which must be escapeFlavorMySQLWithNull
if !(len(parser.quote) > 0 && parser.quotedNullIsText && input.quoted) {
// this branch represents "quote is not configured" or "quoted null is null" or "this field has no quote"
// we check null for them
isNull = !parser.cfg.NotNull &&
slices.Contains(parser.cfg.Null, unescaped)
// avoid \\N becomes NULL
if parser.escFlavor == escapeFlavorMySQLWithNull && unescaped == parser.escapedBy+`N` {
// avoid \\N
isNull = false
}
}
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
NullInfo: v.NullInfo,
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
Expand Down
148 changes: 36 additions & 112 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package executor

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -42,7 +41,6 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -76,6 +74,10 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.loadDataInfo.Table.Meta().IsBaseTable() {
return errors.New("can only load data into base tables")
}
if e.loadDataInfo.NullInfo != nil && e.loadDataInfo.NullInfo.OptEnclosed &&
(e.loadDataInfo.FieldsInfo == nil || e.loadDataInfo.FieldsInfo.Enclosed == nil) {
return errors.New("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED")
}

switch e.FileLocRef {
case ast.FileLocServerOrRemote:
Expand Down Expand Up @@ -162,6 +164,7 @@ type LoadDataInfo struct {
Table table.Table
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
NullInfo *ast.NullDefinedBy
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum
Expand Down Expand Up @@ -516,101 +519,6 @@ func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.curBatchCnt = 0
}

// getValidData returns curData that starts from starting symbol.
// If the data doesn't have starting symbol, return curData[len(curData)-startingLen+1:] and false.
func (e *LoadDataInfo) getValidData(curData []byte) ([]byte, bool) {
idx := strings.Index(string(hack.String(curData)), e.LinesInfo.Starting)
if idx == -1 {
return curData[len(curData)-len(e.LinesInfo.Starting)+1:], false
}

return curData[idx:], true
}

// indexOfTerminator return index of terminator, if not, return -1.
// normally, the field terminator and line terminator is short, so we just use brute force algorithm.
func (e *LoadDataInfo) indexOfTerminator(bs []byte) int {
fieldTerm := []byte(e.FieldsInfo.Terminated)
fieldTermLen := len(fieldTerm)
lineTerm := []byte(e.LinesInfo.Terminated)
lineTermLen := len(lineTerm)
type termType int
const (
notTerm termType = iota
fieldTermType
lineTermType
)
// likely, fieldTermLen should equal to lineTermLen, compare fieldTerm first can avoid useless lineTerm comparison.
cmpTerm := func(restLen int, bs []byte) (typ termType) {
if restLen >= fieldTermLen && bytes.Equal(bs[:fieldTermLen], fieldTerm) {
typ = fieldTermType
return
}
if restLen >= lineTermLen && bytes.Equal(bs[:lineTermLen], lineTerm) {
typ = lineTermType
return
}
return
}
if lineTermLen > fieldTermLen && bytes.HasPrefix(lineTerm, fieldTerm) {
// unlikely, fieldTerm is prefix of lineTerm, we should compare lineTerm first.
cmpTerm = func(restLen int, bs []byte) (typ termType) {
if restLen >= lineTermLen && bytes.Equal(bs[:lineTermLen], lineTerm) {
typ = lineTermType
return
}
if restLen >= fieldTermLen && bytes.Equal(bs[:fieldTermLen], fieldTerm) {
typ = fieldTermType
return
}
return
}
}
atFieldStart := true
inQuoter := false
loop:
for i := 0; i < len(bs); i++ {
if atFieldStart && e.FieldsInfo.Enclosed != byte(0) && bs[i] == e.FieldsInfo.Enclosed {
inQuoter = !inQuoter
atFieldStart = false
continue
}
restLen := len(bs) - i - 1
if inQuoter && e.FieldsInfo.Enclosed != byte(0) && bs[i] == e.FieldsInfo.Enclosed {
// look ahead to see if it is end of line or field.
switch cmpTerm(restLen, bs[i+1:]) {
case lineTermType:
return i + 1
case fieldTermType:
i += fieldTermLen
inQuoter = false
atFieldStart = true
continue loop
default:
}
}
if !inQuoter {
// look ahead to see if it is end of line or field.
switch cmpTerm(restLen+1, bs[i:]) {
case lineTermType:
return i
case fieldTermType:
i += fieldTermLen - 1
inQuoter = false
atFieldStart = true
continue loop
default:
}
}
// if it is escaped char, skip next char.
if bs[i] == e.FieldsInfo.Escaped {
i++
}
atFieldStart = false
}
return -1
}

// ReadRows reads rows from parser. When parser's reader meet EOF, it will return
// nil. For other errors it will return directly. When the rows batch is full it
// will also return nil.
Expand Down Expand Up @@ -763,27 +671,43 @@ func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) error

// GenerateCSVConfig generates a CSV config for parser from LoadDataInfo.
func (e *LoadDataInfo) GenerateCSVConfig() *config.CSVConfig {
var nullDef []string
if e.FieldsInfo.Enclosed != 0 {
var (
nullDef []string
quotedNullIsText = true
)

if e.NullInfo != nil {
nullDef = append(nullDef, e.NullInfo.NullDef)
quotedNullIsText = !e.NullInfo.OptEnclosed
} else if e.FieldsInfo.Enclosed != nil {
nullDef = append(nullDef, "NULL")
}
if e.FieldsInfo.Escaped != 0 {
nullDef = append(nullDef, string([]byte{e.FieldsInfo.Escaped, 'N'}))
if e.FieldsInfo.Escaped != nil {
nullDef = append(nullDef, string([]byte{*e.FieldsInfo.Escaped, 'N'}))
}

enclosed := ""
if e.FieldsInfo.Enclosed != nil {
enclosed = string([]byte{*e.FieldsInfo.Enclosed})
}
escaped := ""
if e.FieldsInfo.Escaped != nil {
escaped = string([]byte{*e.FieldsInfo.Escaped})
}

return &config.CSVConfig{
Separator: e.FieldsInfo.Terminated,
// ignore optionally enclosed
Delimiter: string([]byte{e.FieldsInfo.Enclosed}),
Terminator: e.LinesInfo.Terminated,
NotNull: false,
Null: nullDef,
Header: false,
TrimLastSep: false,
EscapedBy: string([]byte{e.FieldsInfo.Escaped}),
StartingBy: e.LinesInfo.Starting,
AllowEmptyLine: true,
// TODO: set it through NULL DEFINED BY OPTIONALLY ENCLOSED
QuotedNullIsText: true,
Delimiter: enclosed,
Terminator: e.LinesInfo.Terminated,
NotNull: false,
Null: nullDef,
Header: false,
TrimLastSep: false,
EscapedBy: escaped,
StartingBy: e.LinesInfo.Starting,
AllowEmptyLine: true,
QuotedNullIsText: quotedNullIsText,
}
}

Expand Down
Loading