Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: remove childrenResult from baseExecutor #7076

Merged
merged 7 commits into from
Jul 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type HashAggExec struct {
defaultVal *chunk.Chunk
// isChildReturnEmpty indicates whether the child executor only returns an empty input.
isChildReturnEmpty bool

childResult *chunk.Chunk
}

// HashAggInput indicates the input of hash agg exec.
Expand Down Expand Up @@ -201,6 +203,7 @@ func (d *HashAggIntermData) ToRows(sc *stmtctx.StatementContext, rows []types.Da
// Close implements the Executor Close interface.
func (e *HashAggExec) Close() error {
if e.isUnparallelExec {
e.childResult = nil
e.groupMap = nil
e.groupIterator = nil
e.aggCtxsMap = nil
Expand Down Expand Up @@ -247,6 +250,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.rowBuffer = make([]types.Datum, 0, e.Schema().Len())
e.groupKey = make([]byte, 0, 8)
e.groupVals = make([][]byte, 0, 8)
e.childResult = e.children[0].newChunk()
}

func (e *HashAggExec) initForParallelExec() {
Expand Down Expand Up @@ -676,14 +680,14 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro

// execute fetches Chunks from src and update each aggregate function for each row in Chunk.
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childrenResults[0])
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
// no more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
return nil
}
for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() {
Expand Down Expand Up @@ -765,17 +769,19 @@ type StreamAggExec struct {
newAggFuncs []aggfuncs.AggFunc
partialResults []aggfuncs.PartialResult
groupRows []chunk.Row

childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}

e.childResult = e.children[0].newChunk()
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childrenResults[0])
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
e.inputRow = e.inputIter.End()
e.mutableRow = chunk.MutRowFromTypes(e.retTypes())
e.rowBuffer = make([]types.Datum, 0, e.Schema().Len())
Expand All @@ -795,6 +801,12 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
return nil
}

// Close implements the Executor Close interface.
func (e *StreamAggExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -876,13 +888,13 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
}
}

err = e.children[0].Next(ctx, e.childrenResults[0])
err = e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}

// No more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
if !e.isChildReturnEmpty {
err = e.appendResult2Chunk(chk)
} else if e.defaultVal != nil {
Expand Down
81 changes: 49 additions & 32 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ var (
)

type baseExecutor struct {
ctx sessionctx.Context
id string
schema *expression.Schema
maxChunkSize int
children []Executor
childrenResults []*chunk.Chunk
retFieldTypes []*types.FieldType
ctx sessionctx.Context
id string
schema *expression.Schema
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand All @@ -80,10 +79,6 @@ func (e *baseExecutor) Open(ctx context.Context) error {
return errors.Trace(err)
}
}
e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}
return nil
}

Expand All @@ -95,7 +90,6 @@ func (e *baseExecutor) Close() error {
return errors.Trace(err)
}
}
e.childrenResults = nil
return nil
}

Expand Down Expand Up @@ -512,6 +506,8 @@ type LimitExec struct {

// meetFirstBatch represents whether we have met the first valid Chunk from child.
meetFirstBatch bool

childResult *chunk.Chunk
}

// Next implements the Executor Next interface.
Expand All @@ -521,11 +517,11 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
for !e.meetFirstBatch {
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
batchSize := uint64(e.childrenResults[0].NumRows())
batchSize := uint64(e.childResult.NumRows())
// no more data.
if batchSize == 0 {
return nil
Expand All @@ -540,7 +536,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if begin == end {
break
}
chk.Append(e.childrenResults[0], int(begin), int(end))
chk.Append(e.childResult, int(begin), int(end))
return nil
}
e.cursor += batchSize
Expand All @@ -567,11 +563,18 @@ func (e *LimitExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
}

// Close implements the Executor Close interface.
func (e *LimitExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

func init() {
// While doing optimization in the plan package, we need to execute uncorrelated subquery,
// but the plan package cannot import the executor package because of the dependency cycle.
Expand Down Expand Up @@ -646,34 +649,34 @@ func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
type SelectionExec struct {
baseExecutor

batched bool
filters []expression.Expression
selected []bool
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
batched bool
filters []expression.Expression
selected []bool
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.batched = expression.Vectorizable(e.filters)
if e.batched {
e.selected = make([]bool, 0, chunk.InitialCapacity)
}
e.inputIter = chunk.NewIterator4Chunk(e.childrenResults[0])
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
e.inputRow = e.inputIter.End()
return nil
}

// Close implements plan.Plan Close interface.
func (e *SelectionExec) Close() error {
if err := e.baseExecutor.Close(); err != nil {
return errors.Trace(err)
}
e.childResult = nil
e.selected = nil
return nil
return errors.Trace(e.baseExecutor.Close())
}

// Next implements the Executor Next interface.
Expand All @@ -694,12 +697,12 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk.AppendRow(e.inputRow)
}
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
// no more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected)
Expand All @@ -726,13 +729,13 @@ func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) err
return nil
}
}
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
e.inputRow = e.inputIter.Begin()
// no more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
return nil
}
}
Expand Down Expand Up @@ -838,14 +841,16 @@ func (e *TableScanExec) Open(ctx context.Context) error {
type ExistsExec struct {
baseExecutor

evaluated bool
evaluated bool
childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *ExistsExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.evaluated = false
return nil
}
Expand All @@ -855,11 +860,11 @@ func (e *ExistsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if !e.evaluated {
e.evaluated = true
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
if e.childrenResults[0].NumRows() > 0 {
if e.childResult.NumRows() > 0 {
chk.AppendInt64(0, 1)
} else {
chk.AppendInt64(0, 0)
Expand All @@ -868,6 +873,12 @@ func (e *ExistsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

// Close implements the Executor Close interface.
func (e *ExistsExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

// MaxOneRowExec checks if the number of rows that a query returns is at maximum one.
// It's built from subquery expression.
type MaxOneRowExec struct {
Expand Down Expand Up @@ -937,6 +948,8 @@ type UnionExec struct {
resourcePools []chan *chunk.Chunk
resultPool chan *unionWorkerResult
initialized bool

childrenResults []*chunk.Chunk
}

// unionWorkerResult stores the result for a union worker.
Expand All @@ -959,6 +972,9 @@ func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
Expand Down Expand Up @@ -1039,6 +1055,7 @@ func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
// Close implements the Executor Close interface.
func (e *UnionExec) Close() error {
close(e.finished)
e.childrenResults = nil
if e.resultPool != nil {
for range e.resultPool {
}
Expand Down
8 changes: 8 additions & 0 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type MergeJoinExec struct {
innerRows []chunk.Row
innerIter4Row chunk.Iterator

childrenResults []*chunk.Chunk

memTracker *memory.Tracker
}

Expand Down Expand Up @@ -177,6 +179,7 @@ func (t *mergeJoinInnerTable) reallocReaderResult() {
// Close implements the Executor Close interface.
func (e *MergeJoinExec) Close() error {
e.memTracker.Detach()
e.childrenResults = nil
e.memTracker = nil

return errors.Trace(e.baseExecutor.Close())
Expand All @@ -192,6 +195,11 @@ func (e *MergeJoinExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaMergeJoin)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}

e.innerTable.memTracker = memory.NewTracker("innerTable", -1)
e.innerTable.memTracker.AttachTo(e.memTracker)

Expand Down
Loading