Skip to content

Commit

Permalink
executor: support Chunk for SelectionExec (pingcap#5211)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored and XuHuaiyu committed Nov 30, 2017
1 parent d815ce1 commit ba1979c
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 6 deletions.
14 changes: 10 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,11 +628,17 @@ func (b *executorBuilder) buildAggregation(v *plan.PhysicalAggregation) Executor
}

func (b *executorBuilder) buildSelection(v *plan.PhysicalSelection) Executor {
exec := &SelectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
Conditions: v.Conditions,
childExec := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
return exec
e := &SelectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, childExec),
filters: v.Conditions,
}
e.supportChk = true
return e
}

func (b *executorBuilder) buildProjection(v *plan.Projection) Executor {
Expand Down
91 changes: 89 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,32 @@ func (e *TableDualExec) Next(goCtx goctx.Context) (Row, error) {
type SelectionExec struct {
baseExecutor

Conditions []expression.Expression
batched bool
filters []expression.Expression
selected []bool
inputRow chunk.Row
}

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(goCtx goctx.Context) error {
if err := e.baseExecutor.Open(goCtx); err != nil {
return errors.Trace(err)
}
e.batched = expression.Vectorizable(e.filters)
if e.batched {
e.selected = make([]bool, 0, chunk.InitialCapacity)
}
e.inputRow = e.childrenResults[0].End()
return nil
}

// Close implements plan.Plan Close interface.
func (e *SelectionExec) Close() error {
if err := e.baseExecutor.Close(); err != nil {
return errors.Trace(err)
}
e.selected = nil
return nil
}

// Next implements the Executor Next interface.
Expand All @@ -561,7 +586,7 @@ func (e *SelectionExec) Next(goCtx goctx.Context) (Row, error) {
if srcRow == nil {
return nil, nil
}
match, err := expression.EvalBool(e.Conditions, srcRow, e.ctx)
match, err := expression.EvalBool(e.filters, srcRow, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -571,6 +596,68 @@ func (e *SelectionExec) Next(goCtx goctx.Context) (Row, error) {
}
}

// NextChunk implements the Executor NextChunk interface.
func (e *SelectionExec) NextChunk(chk *chunk.Chunk) error {
chk.Reset()

if !e.batched {
return errors.Trace(e.unBatchedNextChunk(chk))
}

for {
for ; e.inputRow != e.childrenResults[0].End(); e.inputRow = e.inputRow.Next() {
if !e.selected[e.inputRow.Idx()] {
continue
}
if chk.NumRows() == e.ctx.GetSessionVars().MaxChunkSize {
return nil
}
chk.AppendRow(0, e.inputRow)
}
err := e.children[0].NextChunk(e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
e.inputRow = e.childrenResults[0].Begin()
// no more data.
if e.childrenResults[0].NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.childrenResults[0], e.selected)
if err != nil {
return errors.Trace(err)
}
}
}

// unBatchedNextChunk filters input rows one by one and returns once an input row is selected.
// For sql with "SETVAR" in filter and "GETVAR" in projection, for example: "SELECT @a FROM t WHERE (@a := 2) > 0",
// we have to set batch size to 1 to do the evaluation of filter and projection.
func (e *SelectionExec) unBatchedNextChunk(chk *chunk.Chunk) error {
for {
for ; e.inputRow != e.childrenResults[0].End(); e.inputRow = e.inputRow.Next() {
selected, err := expression.EvalBool(e.filters, e.inputRow, e.ctx)
if err != nil {
return errors.Trace(err)
}
if selected {
chk.AppendRow(0, e.inputRow)
e.inputRow = e.inputRow.Next()
return nil
}
}
err := e.children[0].NextChunk(e.childrenResults[0])
if err != nil {
return errors.Trace(err)
}
e.inputRow = e.childrenResults[0].Begin()
// no more data.
if e.childrenResults[0].NumRows() == 0 {
return nil
}
}
}

// TableScanExec is a table scan executor without result fields.
type TableScanExec struct {
baseExecutor
Expand Down
22 changes: 22 additions & 0 deletions expression/chunk_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,25 @@ func executeToString(sc *stmtctx.StatementContext, expr Expression, fieldType *t
}
return nil
}

// VectorizedFilter applys a list of filters to a Chunk and returns a bool slice, which indicates whether a row is passed the filters.
// Filters is executed vectorized.
func VectorizedFilter(ctx context.Context, filters []Expression, input *chunk.Chunk, selected []bool) ([]bool, error) {
selected = selected[:0]
for i, numRows := 0, input.NumRows(); i < numRows; i++ {
selected = append(selected, true)
}
for _, filter := range filters {
for row := input.Begin(); row != input.End(); row = row.Next() {
if !selected[row.Idx()] {
continue
}
filterResult, isNull, err := filter.EvalInt(row, ctx.GetSessionVars().StmtCtx)
if err != nil {
return nil, errors.Trace(err)
}
selected[row.Idx()] = selected[row.Idx()] && !isNull && (filterResult != 0)
}
}
return selected, nil
}
5 changes: 5 additions & 0 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ type Row struct {
idx int
}

// Idx returns the row index of Chunk.
func (r Row) Idx() int {
return r.idx
}

// Len returns the number of values in the row.
func (r Row) Len() int {
return r.c.NumCols()
Expand Down

0 comments on commit ba1979c

Please sign in to comment.