Skip to content

Commit

Permalink
cherry pick #21895 to release-4.0 (#22481)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jan 26, 2021
1 parent 6a65ccd commit 1878b9d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
72 changes: 67 additions & 5 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,59 @@ func (e *LoadDataInfo) getValidData(prevData, curData []byte) ([]byte, []byte) {
return nil, curData
}

// indexOfTerminator return index of terminator, if not, return -1.
// normally, the field terminator and line terminator is short, so we just use brute force algorithm.
func (e *LoadDataInfo) indexOfTerminator(bs []byte) int {
fieldTerm := []byte(e.FieldsInfo.Terminated)
fieldTermLen := len(fieldTerm)
lineTerm := []byte(e.LinesInfo.Terminated)
lineTermLen := len(lineTerm)
length := len(bs)
atFieldStart := true
inQuoter := false
for i := 0; i < length; i++ {
if atFieldStart && bs[i] == e.FieldsInfo.Enclosed {
inQuoter = true
atFieldStart = false
continue
}
restLen := length - i - 1
if inQuoter && bs[i] == e.FieldsInfo.Enclosed {
// look ahead to see if it is end of field. if the next is field terminator, then it is.
if restLen >= fieldTermLen && bytes.Equal(bs[i+1:i+fieldTermLen+1], fieldTerm) {
i += fieldTermLen
inQuoter = false
atFieldStart = true
continue
}
// look ahead to see if it is end of line. if the next is line terminator, then return.
if restLen >= lineTermLen && bytes.Equal(bs[i+1:i+lineTermLen+1], lineTerm) {
return i + 1
}
}
// look ahead to see if it is end of field. if the next is field terminator, then it is.
if !inQuoter && restLen >= fieldTermLen-1 && bytes.Equal(bs[i:i+fieldTermLen], fieldTerm) {
i += fieldTermLen - 1
inQuoter = false
atFieldStart = true
continue
}
// look ahead to see if it is end of line. if the next is line terminator, then return.
if !inQuoter && restLen >= lineTermLen-1 && bytes.Equal(bs[i:i+lineTermLen], lineTerm) {
return i
}
// if it is escaped char, skip next char.
if bs[i] == e.FieldsInfo.Escaped {
i++
}
atFieldStart = false
}
return -1
}

// getLine returns a line, curData, the next data start index and a bool value.
// If it has starting symbol the bool is true, otherwise is false.
func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool) {
func (e *LoadDataInfo) getLine(prevData, curData []byte, ignore bool) ([]byte, []byte, bool) {
startingLen := len(e.LinesInfo.Starting)
prevData, curData = e.getValidData(prevData, curData)
if prevData == nil && len(curData) < startingLen {
Expand All @@ -328,7 +378,11 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool)
}
endIdx := -1
if len(curData) >= curStartIdx {
endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated)
if ignore {
endIdx = strings.Index(string(hack.String(curData[curStartIdx:])), e.LinesInfo.Terminated)
} else {
endIdx = e.indexOfTerminator(curData[curStartIdx:])
}
}
if endIdx == -1 {
// no terminated symbol
Expand All @@ -338,7 +392,11 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool)

// terminated symbol in the middle of prevData and curData
curData = append(prevData, curData...)
endIdx = strings.Index(string(hack.String(curData[startingLen:])), e.LinesInfo.Terminated)
if ignore {
endIdx = strings.Index(string(hack.String(curData[startingLen:])), e.LinesInfo.Terminated)
} else {
endIdx = e.indexOfTerminator(curData[startingLen:])
}
if endIdx != -1 {
nextDataIdx := startingLen + endIdx + terminatedLen
return curData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true
Expand All @@ -355,7 +413,11 @@ func (e *LoadDataInfo) getLine(prevData, curData []byte) ([]byte, []byte, bool)

// terminated symbol in the curData
prevData = append(prevData, curData[:nextDataIdx]...)
endIdx = strings.Index(string(hack.String(prevData[startingLen:])), e.LinesInfo.Terminated)
if ignore {
endIdx = strings.Index(string(hack.String(prevData[startingLen:])), e.LinesInfo.Terminated)
} else {
endIdx = e.indexOfTerminator(prevData[startingLen:])
}
if endIdx >= prevLen {
return prevData[startingLen : startingLen+endIdx], curData[nextDataIdx:], true
}
Expand All @@ -380,7 +442,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
prevData, curData = curData, prevData
}
for len(curData) > 0 {
line, curData, hasStarting = e.getLine(prevData, curData)
line, curData, hasStarting = e.getLine(prevData, curData, e.IgnoreLines > 0)
prevData = nil
// If it doesn't find the terminated symbol and this data isn't the last data,
// the data can't be inserted.
Expand Down
8 changes: 8 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,14 @@ func (s *testSuite4) TestLoadData(c *C) {
[]string{"10|2|3|4", "40|<nil>|<nil>|<nil>"}, []byte("xxx"), "Records: 2 Deleted: 0 Skipped: 0 Warnings: 1"},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)

// test line terminator in field quoter
ld.LinesInfo.Terminated = "\n"
ld.FieldsInfo.Enclosed = '"'
tests = []testCase{
{[]byte("xxx1\\1\\\"2\n\"\\3\nxxx4\\4\\\"5\n5\"\\6"), nil, []string{"1|1|2\n|3", "4|4|5\n5|6"}, nil, "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"},
}
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

func (s *testSuite4) TestLoadDataEscape(c *C) {
Expand Down

0 comments on commit 1878b9d

Please sign in to comment.