Skip to content

Commit

Permalink
executor: copy row data instead of refereeing chunk.Row in some windo…
Browse files Browse the repository at this point in the history
…w functions. (#11678)
  • Loading branch information
SunRunAway authored Aug 22, 2019
1 parent 6f76bbe commit 8d165f0
Show file tree
Hide file tree
Showing 16 changed files with 654 additions and 136 deletions.
2 changes: 1 addition & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func buildLeadLag(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) baseLeadLag
args: aggFuncDesc.Args,
ordinal: ordinal,
}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, valueEvaluator: buildValueEvaluator(aggFuncDesc.RetTp)}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, retTp: aggFuncDesc.RetTp}
}

func buildLead(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
Expand Down
24 changes: 11 additions & 13 deletions executor/aggfuncs/func_cume_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,33 @@ type cumeDist struct {
}

type partialResult4CumeDist struct {
curIdx int
lastRank int
rows []chunk.Row
partialResult4Rank
cum int64
}

func (r *cumeDist) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Rank{})
return PartialResult(&partialResult4CumeDist{})
}

func (r *cumeDist) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Rank)(pr)
p.curIdx = 0
p.lastRank = 0
p.rows = p.rows[:0]
p := (*partialResult4CumeDist)(pr)
p.partialResult4Rank.reset()
p.cum = 0
}

func (r *cumeDist) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4CumeDist)(pr)
p.rows = append(p.rows, rowsInGroup...)
p.partialResult4Rank.updatePartialResult(rowsInGroup, false, r.compareRows)
return nil
}

func (r *cumeDist) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4CumeDist)(pr)
numRows := len(p.rows)
for p.lastRank < numRows && r.compareRows(p.rows[p.curIdx], p.rows[p.lastRank]) == 0 {
p.lastRank++
numRows := int64(len(p.results))
for p.cum < numRows && p.results[p.cum] == p.results[p.curIdx] {
p.cum++
}
p.curIdx++
chk.AppendFloat64(r.ordinal, float64(p.lastRank)/float64(numRows))
chk.AppendFloat64(r.ordinal, float64(p.cum)/float64(numRows))
return nil
}
188 changes: 156 additions & 32 deletions executor/aggfuncs/func_lead_lag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,76 +14,200 @@
package aggfuncs

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

type baseLeadLag struct {
baseAggFunc
valueEvaluator // TODO: move it to partial result when parallel execution is supported.

defaultExpr expression.Expression
offset uint64
retTp *types.FieldType
}

type partialResult4LeadLag struct {
rows []chunk.Row
curIdx uint64
type circleBuf struct {
buf []valueExtractor
head, tail int
size int
}

func (v *baseLeadLag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4LeadLag{})
func (cb *circleBuf) reset() {
cb.buf = cb.buf[:0]
cb.head, cb.tail = 0, 0
}

func (v *baseLeadLag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4LeadLag)(pr)
p.rows = p.rows[:0]
p.curIdx = 0
func (cb *circleBuf) append(e valueExtractor) {
if len(cb.buf) < cb.size {
cb.buf = append(cb.buf, e)
cb.tail++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
cb.buf[cb.tail] = e
cb.tail++
}
}

func (v *baseLeadLag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4LeadLag)(pr)
p.rows = append(p.rows, rowsInGroup...)
return nil
func (cb *circleBuf) get() (e valueExtractor) {
if len(cb.buf) < cb.size {
e = cb.buf[cb.head]
cb.head++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
e = cb.buf[cb.tail]
cb.tail++
}
return e
}

type partialResult4Lead struct {
seenRows uint64
curIdx int
extractors []valueExtractor
defaultExtractors circleBuf
defaultConstExtractor valueExtractor
}

const maxDefaultExtractorBufferSize = 1000

type lead struct {
baseLeadLag
}

func (v *lead) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lead{
defaultExtractors: circleBuf{
// Do not use v.offset directly since v.offset is defined by user
// and may larger than a table size.
buf: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
size: int(v.offset),
},
})
}

func (v *lead) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lead)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors.reset()
p.defaultConstExtractor = nil
}

func (v *lead) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lead)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows > v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
if v.offset > 0 {
if !v.defaultExpr.ConstItem() {
// We must cache the results of last v.offset lines.
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors.append(e)
} else if p.defaultConstExtractor == nil {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, chunk.Row{})
if err != nil {
return err
}
p.defaultConstExtractor = e
}
}
}
return nil
}

func (v *lead) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx+v.offset < uint64(len(p.rows)) {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx+v.offset])
p := (*partialResult4Lead)(pr)
var e valueExtractor
if p.curIdx < len(p.extractors) {
e = p.extractors[p.curIdx]
} else {
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
if !v.defaultExpr.ConstItem() {
e = p.defaultExtractors.get()
} else {
e = p.defaultConstExtractor
}
}
if err != nil {
return err
}
v.appendResult(chk, v.ordinal)
e.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}

type partialResult4Lag struct {
seenRows uint64
curIdx uint64
extractors []valueExtractor
defaultExtractors []valueExtractor
}

type lag struct {
baseLeadLag
}

func (v *lag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lag{
defaultExtractors: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
})
}

func (v *lag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lag)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors = p.defaultExtractors[:0]
}

func (v *lag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lag)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows <= v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors = append(p.defaultExtractors, e)
}
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
return nil
}

func (v *lag) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx >= v.offset {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx-v.offset])
p := (*partialResult4Lag)(pr)
var e valueExtractor
if p.curIdx < v.offset {
e = p.defaultExtractors[p.curIdx]
} else {
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
}
if err != nil {
return err
e = p.extractors[p.curIdx-v.offset]
}
v.appendResult(chk, v.ordinal)
e.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}
Loading

0 comments on commit 8d165f0

Please sign in to comment.