Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: revert mostly changes in #11678 #12481

Merged
merged 4 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func buildLeadLag(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) baseLeadLag
args: aggFuncDesc.Args,
ordinal: ordinal,
}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, retTp: aggFuncDesc.RetTp}
return baseLeadLag{baseAggFunc: base, offset: offset, defaultExpr: defaultExpr, valueEvaluator: buildValueEvaluator(aggFuncDesc.RetTp)}
}

func buildLead(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
Expand Down
20 changes: 11 additions & 9 deletions executor/aggfuncs/func_cume_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type cumeDist struct {
}

type partialResult4CumeDist struct {
partialResult4Rank
cum int64
curIdx int
lastRank int
rows []chunk.Row
}

func (r *cumeDist) AllocPartialResult() PartialResult {
Expand All @@ -34,23 +35,24 @@ func (r *cumeDist) AllocPartialResult() PartialResult {

func (r *cumeDist) ResetPartialResult(pr PartialResult) {
p := (*partialResult4CumeDist)(pr)
p.partialResult4Rank.reset()
p.cum = 0
p.curIdx = 0
p.lastRank = 0
p.rows = p.rows[:0]
}

func (r *cumeDist) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4CumeDist)(pr)
p.partialResult4Rank.updatePartialResult(rowsInGroup, false, r.compareRows)
p.rows = append(p.rows, rowsInGroup...)
return nil
}

func (r *cumeDist) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4CumeDist)(pr)
numRows := int64(len(p.results))
for p.cum < numRows && p.results[p.cum] == p.results[p.curIdx] {
p.cum++
numRows := len(p.rows)
for p.lastRank < numRows && r.compareRows(p.rows[p.curIdx], p.rows[p.lastRank]) == 0 {
p.lastRank++
}
p.curIdx++
chk.AppendFloat64(r.ordinal, float64(p.cum)/float64(numRows))
chk.AppendFloat64(r.ordinal, float64(p.lastRank)/float64(numRows))
return nil
}
188 changes: 32 additions & 156 deletions executor/aggfuncs/func_lead_lag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,200 +14,76 @@
package aggfuncs

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

type baseLeadLag struct {
baseAggFunc
valueEvaluator // TODO: move it to partial result when parallel execution is supported.

defaultExpr expression.Expression
offset uint64
retTp *types.FieldType
}

type circleBuf struct {
buf []valueExtractor
head, tail int
size int
type partialResult4LeadLag struct {
rows []chunk.Row
curIdx uint64
}

func (cb *circleBuf) reset() {
cb.buf = cb.buf[:0]
cb.head, cb.tail = 0, 0
func (v *baseLeadLag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4LeadLag{})
}

func (cb *circleBuf) append(e valueExtractor) {
if len(cb.buf) < cb.size {
cb.buf = append(cb.buf, e)
cb.tail++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
cb.buf[cb.tail] = e
cb.tail++
}
}

func (cb *circleBuf) get() (e valueExtractor) {
if len(cb.buf) < cb.size {
e = cb.buf[cb.head]
cb.head++
} else {
if cb.tail >= cb.size {
cb.tail = 0
}
e = cb.buf[cb.tail]
cb.tail++
}
return e
func (v *baseLeadLag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4LeadLag)(pr)
p.rows = p.rows[:0]
p.curIdx = 0
}

type partialResult4Lead struct {
seenRows uint64
curIdx int
extractors []valueExtractor
defaultExtractors circleBuf
defaultConstExtractor valueExtractor
func (v *baseLeadLag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4LeadLag)(pr)
p.rows = append(p.rows, rowsInGroup...)
return nil
}

const maxDefaultExtractorBufferSize = 1000

type lead struct {
baseLeadLag
}

func (v *lead) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lead{
defaultExtractors: circleBuf{
// Do not use v.offset directly since v.offset is defined by user
// and may larger than a table size.
buf: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
size: int(v.offset),
},
})
}

func (v *lead) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lead)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors.reset()
p.defaultConstExtractor = nil
}

func (v *lead) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lead)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows > v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
if v.offset > 0 {
if !v.defaultExpr.ConstItem() {
// We must cache the results of last v.offset lines.
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors.append(e)
} else if p.defaultConstExtractor == nil {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, chunk.Row{})
if err != nil {
return err
}
p.defaultConstExtractor = e
}
}
}
return nil
}

func (v *lead) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4Lead)(pr)
var e valueExtractor
if p.curIdx < len(p.extractors) {
e = p.extractors[p.curIdx]
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx+v.offset < uint64(len(p.rows)) {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx+v.offset])
} else {
if !v.defaultExpr.ConstItem() {
e = p.defaultExtractors.get()
} else {
e = p.defaultConstExtractor
}
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
}
e.appendResult(chk, v.ordinal)
if err != nil {
return err
}
v.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}

type partialResult4Lag struct {
seenRows uint64
curIdx uint64
extractors []valueExtractor
defaultExtractors []valueExtractor
}

type lag struct {
baseLeadLag
}

func (v *lag) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4Lag{
defaultExtractors: make([]valueExtractor, 0, mathutil.MinUint64(v.offset, maxDefaultExtractorBufferSize)),
})
}

func (v *lag) ResetPartialResult(pr PartialResult) {
p := (*partialResult4Lag)(pr)
p.seenRows = 0
p.curIdx = 0
p.extractors = p.extractors[:0]
p.defaultExtractors = p.defaultExtractors[:0]
}

func (v *lag) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4Lag)(pr)
for _, row := range rowsInGroup {
p.seenRows++
if p.seenRows <= v.offset {
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.defaultExpr, row)
if err != nil {
return err
}
p.defaultExtractors = append(p.defaultExtractors, e)
}
e := buildValueExtractor(v.retTp)
err = e.extractRow(sctx, v.args[0], row)
if err != nil {
return err
}
p.extractors = append(p.extractors, e)
}
return nil
}

func (v *lag) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4Lag)(pr)
var e valueExtractor
if p.curIdx < v.offset {
e = p.defaultExtractors[p.curIdx]
p := (*partialResult4LeadLag)(pr)
var err error
if p.curIdx >= v.offset {
err = v.evaluateRow(sctx, v.args[0], p.rows[p.curIdx-v.offset])
} else {
e = p.extractors[p.curIdx-v.offset]
err = v.evaluateRow(sctx, v.defaultExpr, p.rows[p.curIdx])
}
if err != nil {
return err
}
e.appendResult(chk, v.ordinal)
v.appendResult(chk, v.ordinal)
p.curIdx++
return nil
}
22 changes: 15 additions & 7 deletions executor/aggfuncs/func_percent_rank.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,31 @@ func (pr *percentRank) AllocPartialResult() PartialResult {

func (pr *percentRank) ResetPartialResult(partial PartialResult) {
p := (*partialResult4Rank)(partial)
p.reset()
p.curIdx = 0
p.lastRank = 0
p.rows = p.rows[:0]
}

func (pr *percentRank) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, partial PartialResult) error {
p := (*partialResult4Rank)(partial)
p.updatePartialResult(rowsInGroup, false, pr.compareRows)
p.rows = append(p.rows, rowsInGroup...)
return nil
}

func (pr *percentRank) AppendFinalResult2Chunk(sctx sessionctx.Context, partial PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4Rank)(partial)
numRows := len(p.results)
if numRows == 1 {
numRows := int64(len(p.rows))
p.curIdx++
if p.curIdx == 1 {
p.lastRank = 1
chk.AppendFloat64(pr.ordinal, 0)
} else {
chk.AppendFloat64(pr.ordinal, float64(p.results[p.curIdx]-1)/float64(numRows-1))
return nil
}
p.curIdx++
if pr.compareRows(p.rows[p.curIdx-2], p.rows[p.curIdx-1]) == 0 {
chk.AppendFloat64(pr.ordinal, float64(p.lastRank-1)/float64(numRows-1))
return nil
}
p.lastRank = p.curIdx
chk.AppendFloat64(pr.ordinal, float64(p.lastRank-1)/float64(numRows-1))
return nil
}
Loading