Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add builtin aggregate function VAR_POP #14101

Merged
merged 25 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
517a566
add varpop based on new branch new_master
githubFZX Dec 16, 2019
7edb475
Merge branch 'master' of https://github.com/pingcap/tidb into new_varpop
githubFZX Dec 16, 2019
3609ff0
modify format of code
githubFZX Dec 16, 2019
077d325
modify format of file func_varpop.go
githubFZX Dec 16, 2019
73ec1fa
Merge branch 'master' of https://github.com/pingcap/tidb into new_varpop
githubFZX Dec 16, 2019
5b8784e
Merge branch 'master' into new_varpop
zz-jason Dec 18, 2019
6c515ec
Merge branch 'new_varpop' of https://github.com/githubFZX/tidb into n…
githubFZX Dec 18, 2019
5838cca
format code of the file func_varpop.go
githubFZX Dec 18, 2019
5e7b04f
format code of file varpop.go secondly
githubFZX Dec 18, 2019
ad9a487
Merge branch 'master' into new_varpop
zz-jason Dec 18, 2019
10bbd96
fix the error in ResetPartialResult for aggregate function varpop
githubFZX Dec 18, 2019
57f037f
Merge branch 'new_varpop' of https://github.com/githubFZX/tidb into n…
githubFZX Dec 18, 2019
9694271
fix some bug, add test case about variance and support distinct in va…
githubFZX Dec 25, 2019
36e0ed7
format code
githubFZX Dec 25, 2019
a07314e
add test for window function
githubFZX Dec 30, 2019
5ecc2c0
Merge branch 'master' of https://github.com/pingcap/tidb into new_varpop
githubFZX Dec 30, 2019
1a28d8b
update go.mod and go.sum
githubFZX Dec 30, 2019
91e4547
Merge branch 'master' into new_varpop
wshwsh12 Dec 30, 2019
1c20fae
Merge branch 'master' into new_varpop
zz-jason Dec 31, 2019
764db15
Merge branch 'master' of https://github.com/pingcap/tidb into new_varpop
githubFZX Jan 6, 2020
74bc798
clean go.mod, go.sum and resolve conflicts
githubFZX Jan 6, 2020
1d83461
Merge branch 'new_varpop' of https://github.com/githubFZX/tidb into n…
githubFZX Jan 6, 2020
70c8aac
Merge branch 'master' of https://github.com/pingcap/tidb into new_varpop
githubFZX Jan 7, 2020
62d784b
update go.sum
githubFZX Jan 7, 2020
b4ef0ec
make test and make dev
githubFZX Jan 7, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func Build(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDesc, ordinal
return buildBitXor(aggFuncDesc, ordinal)
case ast.AggFuncBitAnd:
return buildBitAnd(aggFuncDesc, ordinal)
case ast.AggFuncVarPop:
return buildVarPop(aggFuncDesc, ordinal)
}
return nil
}
Expand Down Expand Up @@ -350,6 +352,25 @@ func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
return &bitAndUint64{baseBitAggFunc{base}}
}

// buildVarPop builds the AggFunc implementation for function "VAR_POP".
func buildVarPop(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseVarPopAggFunc{
baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
},
}
switch aggFuncDesc.Mode {
case aggregation.DedupMode:
return nil
default:
if aggFuncDesc.HasDistinct {
return &varPop4DistinctFloat64{base}
}
return &varPop4Float64{base}
}
}

// buildRowNumber builds the AggFunc implementation for function "ROW_NUMBER".
func buildRowNumber(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseAggFunc{
Expand Down
169 changes: 169 additions & 0 deletions executor/aggfuncs/func_varpop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/set"
)

type baseVarPopAggFunc struct {
baseAggFunc
}

type varPop4Float64 struct {
baseVarPopAggFunc
}

type partialResult4VarPopFloat64 struct {
count int64
sum float64
variance float64
}

func (e *varPop4Float64) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4VarPopFloat64{})
}

func (e *varPop4Float64) ResetPartialResult(pr PartialResult) {
p := (*partialResult4VarPopFloat64)(pr)
p.count = 0
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
p.sum = 0
p.variance = 0
}

func (e *varPop4Float64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4VarPopFloat64)(pr)
if p.count == 0 {
chk.AppendNull(e.ordinal)
return nil
}
varicance := p.variance / float64(p.count)
chk.AppendFloat64(e.ordinal, varicance)
return nil
}

func calculateIntermediate(count int64, sum float64, input float64, variance float64) float64 {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
t := float64(count)*input - sum
variance += (t * t) / (float64(count * (count - 1)))
return variance
}

func (e *varPop4Float64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4VarPopFloat64)(pr)
for _, row := range rowsInGroup {
input, isNull, err := e.args[0].EvalReal(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
p.count++
p.sum += input
if p.count > 1 {
p.variance = calculateIntermediate(p.count, p.sum, input, p.variance)
}
}
return nil
}

func calculateMerge(srcCount, dstCount int64, srcSum, dstSum, srcVariance, dstVariance float64) float64 {
srcCountFloat64 := float64(srcCount)
dstCountFloat64 := float64(dstCount)

t := (srcCountFloat64/dstCountFloat64)*dstSum - srcSum
dstVariance += srcVariance + ((dstCountFloat64/srcCountFloat64)/(dstCountFloat64+srcCountFloat64))*t*t
return dstVariance
}

func (e *varPop4Float64) MergePartialResult(sctx sessionctx.Context, src PartialResult, dst PartialResult) error {
p1, p2 := (*partialResult4VarPopFloat64)(src), (*partialResult4VarPopFloat64)(dst)
if p1.count == 0 {
return nil
}
if p2.count == 0 {
p2.count = p1.count
p2.sum = p1.sum
p2.variance = p1.variance
return nil
}
if p2.count != 0 && p1.count != 0 {
p2.variance = calculateMerge(p1.count, p2.count, p1.sum, p2.sum, p1.variance, p2.variance)
p2.count += p1.count
p2.sum += p1.sum
}
return nil
}

type varPop4DistinctFloat64 struct {
baseVarPopAggFunc
}

type partialResult4VarPopDistinctFloat64 struct {
count int64
sum float64
variance float64
valSet set.Float64Set
}

func (e *varPop4DistinctFloat64) AllocPartialResult() PartialResult {
p := new(partialResult4VarPopDistinctFloat64)
p.count = 0
p.sum = 0
p.variance = 0
p.valSet = set.NewFloat64Set()
return PartialResult(p)
}

func (e *varPop4DistinctFloat64) ResetPartialResult(pr PartialResult) {
p := (*partialResult4VarPopDistinctFloat64)(pr)
p.count = 0
p.sum = 0
p.variance = 0
p.valSet = set.NewFloat64Set()
}

func (e *varPop4DistinctFloat64) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4VarPopDistinctFloat64)(pr)
if p.count == 0 {
chk.AppendNull(e.ordinal)
return nil
}
varicance := p.variance / float64(p.count)
chk.AppendFloat64(e.ordinal, varicance)
return nil
}

func (e *varPop4DistinctFloat64) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4VarPopDistinctFloat64)(pr)
for _, row := range rowsInGroup {
input, isNull, err := e.args[0].EvalReal(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull || p.valSet.Exist(input) {
continue
}
p.valSet.Insert(input)
p.count++
p.sum += input
if p.count > 1 {
p.variance = calculateIntermediate(p.count, p.sum, input, p.variance)
}
}
return nil
}
26 changes: 26 additions & 0 deletions executor/aggfuncs/func_varpop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package aggfuncs_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
)

func (s *testSuite) TestMergePartialResult4Varpop(c *C) {
tests := []aggTest{
buildAggTester(ast.AggFuncVarPop, mysql.TypeDouble, 5, types.NewFloat64Datum(float64(2)), types.NewFloat64Datum(float64(2)/float64(3)), types.NewFloat64Datum(float64(59)/float64(8)-float64(19*19)/float64(8*8))),
}
for _, test := range tests {
s.testMergePartialResult(c, test)
}
}

func (s *testSuite) TestVarpop(c *C) {
tests := []aggTest{
buildAggTester(ast.AggFuncVarPop, mysql.TypeDouble, 5, nil, types.NewFloat64Datum(float64(2))),
}
for _, test := range tests {
s.testAggFunc(c, test)
}
}
35 changes: 30 additions & 5 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,36 @@ func (s *testSuiteAgg) TestAggregation(c *C) {
tk.MustExec("insert into t value(0), (-0.9871), (-0.9871)")
tk.MustQuery("select 10 from t group by a").Check(testkit.Rows("10", "10"))
tk.MustQuery("select sum(a) from (select a from t union all select a from t) tmp").Check(testkit.Rows("-3.9484"))

tk.MustExec("drop table t")
tk.MustExec("create table t(a tinyint, b smallint, c mediumint, d int, e bigint, f float, g double, h decimal)")
tk.MustExec("insert into t values(1, 2, 3, 4, 5, 6.1, 7.2, 8.3), (1, 3, 4, 5, 6, 7.1, 8.2, 9.3)")
result = tk.MustQuery("select var_pop(b), var_pop(c), var_pop(d), var_pop(e), var_pop(f), var_pop(g), var_pop(h) from t group by a")
result.Check(testkit.Rows("0.25 0.25 0.25 0.25 0.25 0.25 0.25"))
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
zz-jason marked this conversation as resolved.
Show resolved Hide resolved

tk.MustExec("insert into t values(2, 3, 4, 5, 6, 7.2, 8.3, 9)")
result = tk.MustQuery("select a, var_pop(b) over w, var_pop(c) over w from t window w as (partition by a)")
result.Check(testkit.Rows("1 0.25 0.25", "1 0.25 0.25", "2 0 0"))

tk.MustExec("delete from t where t.a = 2")
tk.MustExec("insert into t values(1, 2, 4, 5, 6, 6.1, 7.2, 9)")
result = tk.MustQuery("select a, var_pop(distinct b), var_pop(distinct c), var_pop(distinct d), var_pop(distinct e), var_pop(distinct f), var_pop(distinct g), var_pop(distinct h) from t group by a")
result.Check(testkit.Rows("1 0.25 0.25 0.25 0.25 0.25 0.25 0.25"))

tk.MustExec("drop table t")
tk.MustExec("create table t(a int, b bigint, c float, d double, e decimal)")
tk.MustExec("insert into t values(1, 1000, 6.8, 3.45, 8.3), (1, 3998, -3.4, 5.12, 9.3),(1, 288, 9.2, 6.08, 1)")
result = tk.MustQuery("select variance(b), variance(c), variance(d), variance(e) from t group by a")
result.Check(testkit.Rows("2584338.6666666665 29.840000178019228 1.1808222222222229 12.666666666666666"))

tk.MustExec("insert into t values(1, 255, 6.8, 6.08, 1)")
result = tk.MustQuery("select variance(distinct b), variance(distinct c), variance(distinct d), variance(distinct e) from t group by a")
result.Check(testkit.Rows("2364075.6875 29.840000178019228 1.1808222222222229 12.666666666666666"))

tk.MustExec("insert into t values(2, 322, 0.8, 2.22, 6)")
result = tk.MustQuery("select a, variance(b) over w from t window w as (partition by a)")
result.Check(testkit.Rows("1 2364075.6875", "1 2364075.6875", "1 2364075.6875", "1 2364075.6875", "2 0"))

_, err = tk.Exec("select std(a) from t")
c.Assert(errors.Cause(err).Error(), Equals, "unsupported agg function: std")
_, err = tk.Exec("select stddev(a) from t")
Expand All @@ -362,11 +392,6 @@ func (s *testSuiteAgg) TestAggregation(c *C) {
_, err = tk.Exec("select std_samp(a) from t")
// TODO: Fix this error message.
c.Assert(errors.Cause(err).Error(), Equals, "[expression:1305]FUNCTION test.std_samp does not exist")
_, err = tk.Exec("select variance(a) from t")
wshwsh12 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Fix this error message.
c.Assert(errors.Cause(err).Error(), Equals, "unsupported agg function: var_pop")
_, err = tk.Exec("select var_pop(a) from t")
c.Assert(errors.Cause(err).Error(), Equals, "unsupported agg function: var_pop")
_, err = tk.Exec("select var_samp(a) from t")
c.Assert(errors.Cause(err).Error(), Equals, "unsupported agg function: var_samp")

Expand Down
2 changes: 2 additions & 0 deletions expression/aggregation/agg_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *Ag
tp = tipb.ExprType_Agg_BitXor
case ast.AggFuncBitAnd:
tp = tipb.ExprType_Agg_BitAnd
case ast.AggFuncVarPop:
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
tp = tipb.ExprType_VarPop
}
if !client.IsRequestTypeSupported(kv.ReqTypeSelect, int64(tp)) {
return nil
Expand Down
8 changes: 8 additions & 0 deletions expression/aggregation/base_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (a *baseFuncDesc) typeInfer(ctx sessionctx.Context) error {
a.typeInfer4PercentRank()
case ast.WindowFuncLead, ast.WindowFuncLag:
a.typeInfer4LeadLag(ctx)
case ast.AggFuncVarPop:
a.typeInfer4VarPop(ctx)
default:
return errors.Errorf("unsupported agg function: %s", a.Name)
}
Expand Down Expand Up @@ -234,6 +236,12 @@ func (a *baseFuncDesc) typeInfer4LeadLag(ctx sessionctx.Context) {
}
}

func (a *baseFuncDesc) typeInfer4VarPop(ctx sessionctx.Context) {
//var_pop's return value type is double
a.RetTp = types.NewFieldType(mysql.TypeDouble)
a.RetTp.Flen, a.RetTp.Decimal = mysql.MaxRealWidth, types.UnspecifiedLength
}

// GetDefaultValue gets the default value when the function's input is null.
// According to MySQL, default values of the function are listed as follows:
// e.g.
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/pingcap/pd v1.1.0-beta.0.20191219054547-4d65bbefbc6d
github.com/pingcap/sysutil v0.0.0-20191216090214-5f9620d22b3b
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33
github.com/pingcap/tipb v0.0.0-20191227083941-3996eff010dc
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.4.1
Expand All @@ -58,15 +58,16 @@ require (
github.com/uber/jaeger-lib v1.5.0 // indirect
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/atomic v1.5.0
go.uber.org/atomic v1.5.1
go.uber.org/automaxprocs v1.2.0
go.uber.org/zap v1.12.0
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 // indirect
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b
golang.org/x/sys v0.0.0-20191210023423-ac6580df4449
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
golang.org/x/tools v0.0.0-20191107010934-f79515f33823
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4
google.golang.org/genproto v0.0.0-20190905072037-92dd089d5514 // indirect
google.golang.org/grpc v1.25.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
Expand Down
Loading