Skip to content

Commit

Permalink
executor: LOAD DATA use lightning CSV parser (#40852)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Feb 16, 2023
1 parent 0519e7e commit d161aa6
Show file tree
Hide file tree
Showing 13 changed files with 513 additions and 767 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ type CSVConfig struct {
EscapedBy string `toml:"escaped-by" json:"escaped-by"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
StartingBy string `toml:"-" json:"-"`
AllowEmptyLine bool `toml:"-" json:"-"`
// For non-empty Delimiter (for example quotes), null elements inside quotes are not considered as null except for
// `\N` (when escape-by is `\`). That is to say, `\N` is special for null because it always means null.
QuotedNullIsText bool
Expand Down
22 changes: 13 additions & 9 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ type CSVParser struct {
escFlavor escapeFlavor
// if set to true, csv parser will treat the first non-empty line as header line
shouldParseHeader bool
quotedNullIsText bool
// in LOAD DATA, empty line should be treated as a valid record
allowEmptyLine bool
quotedNullIsText bool
}

type field struct {
Expand Down Expand Up @@ -169,6 +171,7 @@ func NewCSVParser(
unquoteByteSet: makeByteSet(unquoteStopSet),
newLineByteSet: makeByteSet(newLineStopSet),
shouldParseHeader: shouldParseHeader,
allowEmptyLine: cfg.AllowEmptyLine,
quotedNullIsText: cfg.QuotedNullIsText,
}, nil
}
Expand Down Expand Up @@ -446,7 +449,6 @@ outside:
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
content = append(content, parser.newLine...)
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
}
Expand Down Expand Up @@ -497,13 +499,15 @@ outside:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
if !parser.allowEmptyLine {
if isEmptyLine {
continue
}
// skip lines only contain whitespaces
if err == nil && whitespaceLine && len(bytes.TrimSpace(parser.recordBuffer)) == 0 {
parser.recordBuffer = parser.recordBuffer[:0]
continue
}
}
parser.fieldIndexes = append(parser.fieldIndexes, len(parser.recordBuffer))
parser.fieldIsQuoted = append(parser.fieldIsQuoted, fieldIsQuoted)
Expand Down
141 changes: 38 additions & 103 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,6 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int
}
}

func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor)
assert.NoError(t, err)

for ignoreNLines > 0 {
// IGNORE N LINES will directly find (line) terminator without checking it's inside quotes
_, _, err = parser.ReadUntilTerminator()
if errors.Cause(err) == io.EOF {
assert.Len(t, tc.expected, 0, "input = %q", tc.input)
return
}
assert.NoError(t, err)
ignoreNLines--
}

for i, row := range tc.expected {
comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1)
e := parser.ReadRow()
assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input)
}
}

func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
Expand Down Expand Up @@ -450,6 +421,21 @@ func TestMySQL(t *testing.T) {
assertPosEqual(t, parser, 26, 2)

require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"\0\b\n\r\t\Z\\\ \c\'\""`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})),
},
Length: 23,
}, parser.LastRow())
}

func TestCustomEscapeChar(t *testing.T) {
Expand Down Expand Up @@ -491,6 +477,29 @@ func TestCustomEscapeChar(t *testing.T) {
assertPosEqual(t, parser, 26, 2)

require.ErrorIs(t, errors.Cause(parser.ReadRow()), io.EOF)

cfg = config.CSVConfig{
Separator: ",",
Delimiter: `"`,
EscapedBy: ``,
NotNull: false,
Null: []string{`NULL`},
}

parser, err = mydump.NewCSVParser(
context.Background(), &cfg,
mydump.NewStringReader(`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`),
int64(config.ReadBlockSize), ioWorkers, false, nil)
require.NoError(t, err)

require.Nil(t, parser.ReadRow())
require.Equal(t, mydump.Row{
RowID: 1,
Row: []types.Datum{
types.NewStringDatum(`{"itemRangeType":0,"itemContainType":0,"shopRangeType":1,"shopJson":"[{\"id\":\"A1234\",\"shopName\":\"AAAAAA\"}]"}`),
},
Length: 115,
}, parser.LastRow())
}

func TestSyntaxErrorCSV(t *testing.T) {
Expand Down Expand Up @@ -1287,80 +1296,6 @@ yyy",5,xx"xxxx,8
require.ErrorContains(t, err, "starting-by cannot contain (line) terminator")
}

func TestCallerCanIgnoreNLines(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases := []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{
{types.NewStringDatum("3"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

testCases = []testCase{
{
input: `"bad syntax"1
"b",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100)

// test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `"a
",1
"b
",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("b\n"), types.NewStringDatum("2")},
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)
}

func TestCharsetConversion(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,19 @@ func MakePooledReader(reader ReadSeekCloser, ioWorkers *worker.Pool) PooledReade

// Read implements io.Reader
func (pr PooledReader) Read(p []byte) (n int, err error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return pr.reader.Read(p)
}

// Seek implements io.Seeker
func (pr PooledReader) Seek(offset int64, whence int) (int64, error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return pr.reader.Seek(offset, whence)
}

Expand All @@ -182,7 +186,9 @@ func (pr PooledReader) Close() error {

// ReadFull is same as `io.ReadFull(pr)` with less worker recycling
func (pr PooledReader) ReadFull(buf []byte) (n int, err error) {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
if pr.ioWorkers != nil {
w := pr.ioWorkers.Apply()
defer pr.ioWorkers.Recycle(w)
}
return io.ReadFull(pr.reader, buf)
}
3 changes: 3 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ go_library(
deps = [
"//bindinfo",
"//br/pkg/glue",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//br/pkg/task",
"//config",
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
loadDataInfo: loadDataInfo,
}
var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.initQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
86 changes: 0 additions & 86 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
plannerutil "github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -89,64 +88,6 @@ func generateDatumSlice(vals ...int64) []types.Datum {
return datums
}

func TestGetFieldsFromLine(t *testing.T) {
tests := []struct {
input string
expected []string
}{
{
`"1","a string","100.20"`,
[]string{"1", "a string", "100.20"},
},
{
`"2","a string containing a , comma","102.20"`,
[]string{"2", "a string containing a , comma", "102.20"},
},
{
`"3","a string containing a \" quote","102.20"`,
[]string{"3", "a string containing a \" quote", "102.20"},
},
{
`"4","a string containing a \", quote and comma","102.20"`,
[]string{"4", "a string containing a \", quote and comma", "102.20"},
},
// Test some escape char.
{
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
// Test mixed.
{
`"123",456,"\t7890",abcd`,
[]string{"123", "456", "\t7890", "abcd"},
},
}

ldInfo := LoadDataInfo{
FieldsInfo: &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
Escaped: '\\',
},
}

for _, test := range tests {
got, err := ldInfo.getFieldsFromLine([]byte(test.input))
require.NoErrorf(t, err, "failed: %s", test.input)
assertEqualStrings(t, got, test.expected)
}

_, err := ldInfo.getFieldsFromLine([]byte(`1,a string,100.20`))
require.NoError(t, err)
}

func assertEqualStrings(t *testing.T, got []field, expect []string) {
require.Equal(t, len(expect), len(got))
for i := 0; i < len(got); i++ {
require.Equal(t, expect[i], string(got[i].str))
}
}

func TestSlowQueryRuntimeStats(t *testing.T) {
stats := &slowQueryRuntimeStats{
totalFileNum: 2,
Expand Down Expand Up @@ -270,33 +211,6 @@ func TestFilterTemporaryTableKeys(t *testing.T) {
require.Len(t, res, 1)
}

func TestLoadDataWithDifferentEscapeChar(t *testing.T) {
tests := []struct {
input string
escapeChar byte
expected []string
}{
{
`"{""itemRangeType"":0,""itemContainType"":0,""shopRangeType"":1,""shopJson"":""[{\""id\"":\""A1234\"",\""shopName\"":\""AAAAAA\""}]""}"`,
byte(0), // escaped by ''
[]string{`{"itemRangeType":0,"itemContainType":0,"shopRangeType":1,"shopJson":"[{\"id\":\"A1234\",\"shopName\":\"AAAAAA\"}]"}`},
},
}

for _, test := range tests {
ldInfo := LoadDataInfo{
FieldsInfo: &ast.FieldsClause{
Enclosed: '"',
Terminated: ",",
Escaped: test.escapeChar,
},
}
got, err := ldInfo.getFieldsFromLine([]byte(test.input))
require.NoErrorf(t, err, "failed: %s", test.input)
assertEqualStrings(t, got, test.expected)
}
}

func TestSortSpillDisk(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"))
defer func() {
Expand Down
Loading

0 comments on commit d161aa6

Please sign in to comment.