Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parser,executor: make load data support user_var in column list and set clause #7707

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,13 +661,14 @@ func (n *Assignment) Accept(v Visitor) (Node, bool) {
type LoadDataStmt struct {
dmlNode

IsLocal bool
Path string
Table *TableName
Columns []*ColumnName
FieldsInfo *FieldsClause
LinesInfo *LinesClause
IgnoreLines uint64
IsLocal bool
Path string
Table *TableName
ColumnOrVars []ExprNode
FieldsInfo *FieldsClause
LinesInfo *LinesClause
IgnoreLines uint64
SetList []*Assignment
}

// Accept implements Node Accept interface.
Expand All @@ -684,12 +685,22 @@ func (n *LoadDataStmt) Accept(v Visitor) (Node, bool) {
}
n.Table = node.(*TableName)
}
for i, val := range n.Columns {
node, ok := val.Accept(v)
if !ok {
return n, false
for i, val := range n.ColumnOrVars {

switch exp := val.(type) {
case *ColumnNameExpr:
name, ok := exp.Name.Accept(v)
if !ok {
return n, false
}
n.ColumnOrVars[i] = &ColumnNameExpr{Name: name.(*ColumnName)}
case *VariableExpr:
node, ok := exp.Accept(v)
if !ok {
return n, false
}
n.ColumnOrVars[i] = node.(*VariableExpr)
}
n.Columns[i] = node.(*ColumnName)
}
return v.Leave(n)
}
Expand Down
2 changes: 1 addition & 1 deletion ast/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ts *testDMLSuite) TestDMLVisitorCover(c *C) {
{&DeleteStmt{TableRefs: tableRefsClause, Tables: &DeleteTableList{}, Where: ce,
Order: &OrderByClause{}, Limit: &Limit{Count: ce, Offset: ce}}, 4, 4},
{&ShowStmt{Table: &TableName{}, Column: &ColumnName{}, Pattern: &PatternLikeExpr{Expr: ce, Pattern: ce}, Where: ce}, 3, 3},
{&LoadDataStmt{Table: &TableName{}, Columns: []*ColumnName{{}}, FieldsInfo: &FieldsClause{}, LinesInfo: &LinesClause{}}, 0, 0},
{&LoadDataStmt{Table: &TableName{}, ColumnOrVars: []ExprNode{}, SetList: []*Assignment{{}}, FieldsInfo: &FieldsClause{}, LinesInfo: &LinesClause{}}, 0, 0},
{&Assignment{Column: &ColumnName{}, Expr: ce}, 1, 1},
{&ByItem{Expr: ce}, 1, 1},
{&GroupByClause{Items: []*ByItem{{Expr: ce}, {Expr: ce}}}, 2, 2},
Expand Down
66 changes: 62 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,63 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
return insert
}

// getColumnInfo4LoadData gets column info for load data.
// return:
// - cols: column info lists, include column in brackets and in set lists.
// - pivot: the pivot point to divide columns in brackets or in sets lists, before pivot is brackets's columns
// - err: error
func getColumnInfo4LoadData(e *InsertValues, tableCols []*table.Column, colOrVars []ast.ExprNode) (cols []*table.Column, pivot int, err error) {
colOrVarLen := len(e.colDefaultVals)
if colOrVarLen == 0 {
colOrVarLen = len(tableCols)
}
cols = make([]*table.Column, 0, colOrVarLen+len(e.SetList))
if len(colOrVars) == 0 {
cols = append(cols, tableCols...)
} else {
columns := make([]string, 0, len(colOrVars))
for _, colOrVar := range colOrVars {
if colName, isCol := colOrVar.(*ast.ColumnNameExpr); isCol {
columns = append(columns, colName.Name.Name.O)
} else {
// an variable in brackets is not a column and no column info,
// but we append empty placeholder string to keep other column's array index "right",
// later eval SetList expression will use those idx.
columns = append(columns, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.FindCols(tableCols, "", ...) 

What's the purpose of this line ?

Copy link
Contributor Author

@lysu lysu Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had added some comments to this line in code :)

}
}
colsBind, err := table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
if err != nil {
return nil, 0, errors.Errorf("%s in field list", err)
}
cols = append(cols, colsBind...)
}

pivot = len(cols)
if len(e.SetList) > 0 {
columns := make([]string, 0, len(e.SetList))
for _, v := range e.SetList {
columns = append(columns, v.Col.ColName.O)
}
colSet, err := table.FindCols(tableCols, columns, e.Table.Meta().PKIsHandle)
if err != nil {
return nil, 0, errors.Errorf("%s in field list", err)
}
cols = append(cols, colSet...)
}

for _, col := range cols {
if col == nil {
continue
}
if col.Name.L == model.ExtraHandleName.L {
e.hasExtraHandle = true
break
}
}
return cols, pivot, nil
}

func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
tbl, ok := b.is.TableByID(v.Table.TableInfo.ID)
if !ok {
Expand All @@ -568,12 +625,12 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
insertVal := &InsertValues{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
Table: tbl,
Columns: v.Columns,
SetList: v.SetList,
GenColumns: v.GenCols.Columns,
GenExprs: v.GenCols.Exprs,
}
tableCols := tbl.Cols()
columns, err := insertVal.getColumns(tableCols)
columns, pivot, err := getColumnInfo4LoadData(insertVal, tableCols, v.ColumnOrVars)
if err != nil {
b.err = errors.Trace(err)
return nil
Expand All @@ -590,10 +647,11 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
LinesInfo: v.LinesInfo,
IgnoreLines: v.IgnoreLines,
Ctx: b.ctx,
columns: columns,
colOrVar: v.ColumnOrVars,
colInfo: columns,
colPivot: pivot,
},
}

return loadDataExec
}

Expand Down
73 changes: 59 additions & 14 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ type LoadDataExec struct {
loadDataInfo *LoadDataInfo
}

// NewLoadDataInfo returns a LoadDataInfo structure, and it's only used for tests now.
func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table, cols []*table.Column) *LoadDataInfo {
insertVal := &InsertValues{baseExecutor: newBaseExecutor(ctx, nil, "InsertValues"), Table: tbl}
return &LoadDataInfo{
row: row,
InsertValues: insertVal,
Table: tbl,
Ctx: ctx,
columns: cols,
}
}

// Next implements the Executor Next interface.
func (e *LoadDataExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
Expand Down Expand Up @@ -95,7 +83,9 @@ type LoadDataInfo struct {
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
columns []*table.Column
colOrVar []ast.ExprNode
colInfo []*table.Column
colPivot int
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
Expand Down Expand Up @@ -274,7 +264,7 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum {
e.row[i].SetString(string(cols[i].str))
}
}
row, err := e.getRow(e.columns, e.row)
row, err := e.getRow4LoadData(e.row)
if err != nil {
e.handleWarning(err,
fmt.Sprintf("Load Data: insert data:%v failed:%v", e.row, errors.ErrorStack(err)))
Expand All @@ -283,6 +273,61 @@ func (e *LoadDataInfo) colsToRow(cols []field) []types.Datum {
return row
}

func (e *LoadDataInfo) getRow4LoadData(vals []types.Datum) ([]types.Datum, error) {
row := make([]types.Datum, len(e.Table.Cols()))
hasValue := make([]bool, len(e.Table.Cols()))
i := 0
for _, v := range vals {
colInfo := e.colInfo[i]
if colInfo == nil {
if !v.IsNull() {
vs, err := v.ToString()
if err != nil {
return nil, err
}
e.ctx.GetSessionVars().Users[(e.colOrVar[i]).(*ast.VariableExpr).Name] = vs
}
i++
if i == e.colPivot {
break
}
continue
}
casted, err := table.CastValue(e.ctx, v, colInfo.ToInfo())
if e.filterErr(err) != nil {
return nil, errors.Trace(err)
}
offset := colInfo.Offset
row[offset] = casted
hasValue[offset] = true
i++
if i == e.colPivot {
break
}
}

if len(e.SetList) > 0 {
mRow := chunk.MutRowFromDatums(row).ToRow()
for _, setExpr := range e.SetList {
colInfo := e.colInfo[i]
v, err := setExpr.Expr.Eval(mRow)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here need wait #7935 , can reuse one buffer instead renew row by row.

if err != nil {
return nil, err
}
casted, err := table.CastValue(e.ctx, v, colInfo.ToInfo())
if e.filterErr(err) != nil {
return nil, errors.Trace(err)
}
offset := colInfo.Offset
row[offset] = casted
hasValue[offset] = true
i++
}
}

return e.fillRow(row, hasValue)
}

func (e *LoadDataInfo) addRecordLD(row []types.Datum) (int64, error) {
if row == nil {
return 0, nil
Expand Down
66 changes: 50 additions & 16 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,10 @@ import (
ColumnNameList "column name list"
ColumnList "column list"
ColumnNameListOpt "column name list opt"
ColumnNameListOptWithBrackets "column name list opt with brackets"
ColumnOrVarListOptWithBrackets "column or variable list opt with brackets"
ColumnOrVarList "column or variable list"
ColumnOrVar "column or variable"
LoadDataSetOpt "load data statement set"
ColumnSetValue "insert statement set value by column name"
ColumnSetValueList "insert statement set value by column name list"
CompareOp "Compare opcode"
Expand Down Expand Up @@ -1384,16 +1387,6 @@ ColumnNameListOpt:
$$ = $1.([]*ast.ColumnName)
}

ColumnNameListOptWithBrackets:
/* EMPTY */
{
$$ = []*ast.ColumnName{}
}
| '(' ColumnNameListOpt ')'
{
$$ = $2.([]*ast.ColumnName)
}

CommitStmt:
"COMMIT"
{
Expand Down Expand Up @@ -6993,13 +6986,14 @@ RevokeStmt:
* See https://dev.mysql.com/doc/refman/5.7/en/load-data.html
*******************************************************************************************/
LoadDataStmt:
"LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines IgnoreLines ColumnNameListOptWithBrackets
"LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines IgnoreLines ColumnOrVarListOptWithBrackets LoadDataSetOpt
{
x := &ast.LoadDataStmt{
Path: $5,
Table: $8.(*ast.TableName),
Columns: $13.([]*ast.ColumnName),
IgnoreLines:$12.(uint64),
Path: $5,
Table: $8.(*ast.TableName),
ColumnOrVars:$13.([]ast.ExprNode),
IgnoreLines: $12.(uint64),
SetList: $14.([]*ast.Assignment),
}
if $3 != nil {
x.IsLocal = true
Expand All @@ -7013,6 +7007,46 @@ LoadDataStmt:
$$ = x
}

ColumnOrVarListOptWithBrackets:
/* EMPTY */
{
$$ = []ast.ExprNode{}
}
| '(' ColumnOrVarList ')'
{
$$ = $2.([]ast.ExprNode)
}

ColumnOrVarList:
ColumnOrVar
{
$$ = []ast.ExprNode{$1.(ast.ExprNode)}
}
| ColumnOrVarList ',' ColumnOrVar
{
$$ = append($1.([]ast.ExprNode), $3.(ast.ExprNode))
}

ColumnOrVar:
UserVariable
{
$$ = $1.(ast.ExprNode)
}
| ColumnName
{
$$ = &ast.ColumnNameExpr{Name: $1.(*ast.ColumnName)}
}

LoadDataSetOpt:
/* EMPTY */
{
$$ = []*ast.Assignment{}
}
| "SET" AssignmentList
{
$$ = $2
}

IgnoreLines:
{
$$ = uint64(0)
Expand Down
19 changes: 10 additions & 9 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,16 @@ type Analyze struct {
type LoadData struct {
baseSchemaProducer

IsLocal bool
Path string
Table *ast.TableName
Columns []*ast.ColumnName
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IgnoreLines uint64

GenCols InsertGeneratedColumns
IsLocal bool
Path string
Table *ast.TableName
ColumnOrVars []ast.ExprNode
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IgnoreLines uint64
SetList []*expression.Assignment
tableSchema *expression.Schema
GenCols InsertGeneratedColumns
}

// LoadStats represents a load stats plan.
Expand Down
Loading