Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expression: add json_arrayagg and json_objectagg #7824

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ast/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ const (
AggFuncBitXor = "bit_xor"
// AggFuncBitAnd is the name of bit_and function.
AggFuncBitAnd = "bit_and"
// AggFuncJsonArrayAgg is the name of json_arrayagg function.
AggFuncJsonArrayAgg = "json_arrayagg"
// AggFuncJsonObjectAgg is the name of json_objectagg function.
AggFuncJsonObjectAgg = "json_objectagg"
)

// AggregateFuncExpr represents aggregate function expression.
Expand Down
8 changes: 8 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ var (

// All the AggFunc implementations for "BIT_AND" are listed here.
_ AggFunc = (*bitAndUint64)(nil)

// All the AggFunc implementations for "JSON_ARRAYAGG" are listed here
_ AggFunc = (*original4JsonArrayAgg)(nil)
_ AggFunc = (*partial4JsonArrayAgg)(nil)

// All the AggFunc implementations for "JSON_OBJECTAGG" are listed here
_ AggFunc = (*original4JsonObjectAgg)(nil)
_ AggFunc = (*partial4JsonObjectAgg)(nil)
)

// PartialResult represents data structure to store the partial result for the
Expand Down
44 changes: 44 additions & 0 deletions executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func Build(ctx sessionctx.Context, aggFuncDesc *aggregation.AggFuncDesc, ordinal
return buildBitXor(aggFuncDesc, ordinal)
case ast.AggFuncBitAnd:
return buildBitAnd(aggFuncDesc, ordinal)
case ast.AggFuncJsonArrayAgg:
return buildJsonArrayAgg(aggFuncDesc, ordinal)
case ast.AggFuncJsonObjectAgg:
return buildJsonObjectAgg(aggFuncDesc, ordinal)
}
return nil
}
Expand Down Expand Up @@ -313,3 +317,43 @@ func buildBitAnd(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
}
return &bitAndUint64{baseBitAggFunc{base}}
}

func buildJsonArrayAgg(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
}
switch aggFuncDesc.Mode {
// Build stddev functions which consume the original data and remove the
// duplicated input of the same group.
case aggregation.DedupMode:
return nil // not implemented yet.
// partial results.
case aggregation.CompleteMode, aggregation.Partial1Mode:
return &original4JsonArrayAgg{baseJsonArrayAgg{base}}
// functions and update their partial results.
case aggregation.Partial2Mode, aggregation.FinalMode:
return &partial4JsonArrayAgg{baseJsonArrayAgg{base}}
}
return nil
}

func buildJsonObjectAgg(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
base := baseAggFunc{
args: aggFuncDesc.Args,
ordinal: ordinal,
}
switch aggFuncDesc.Mode {
// Build stddev functions which consume the original data and remove the
// duplicated input of the same group.
case aggregation.DedupMode:
return nil // not implemented yet.
// partial results.
case aggregation.CompleteMode, aggregation.Partial1Mode:
return &original4JsonObjectAgg{baseJsonObjectAgg{base}}
// functions and update their partial results.
case aggregation.Partial2Mode, aggregation.FinalMode:
return &partial4JsonObjectAgg{baseJsonObjectAgg{base}}
}
return nil
}
144 changes: 144 additions & 0 deletions executor/aggfuncs/func_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
"github.com/pkg/errors"
)

type baseJsonArrayAgg struct {
baseAggFunc
}

type partialResult4JsonArrayAgg struct {
array []interface{}
}

func (e *baseJsonArrayAgg) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4JsonArrayAgg{})
}

func (e *baseJsonArrayAgg) ResetPartialResult(pr PartialResult) {
p := (*partialResult4JsonArrayAgg)(pr)
p.array = make([]interface{}, 0, 0)
}

func (e *baseJsonArrayAgg) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4JsonArrayAgg)(pr)
if len(p.array) == 0 {
chk.AppendNull(e.ordinal)
return nil
}

chk.AppendJSON(e.ordinal, json.CreateBinary(p.array))
return nil
}

func (e *baseJsonArrayAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4JsonArrayAgg)(pr)
result := make([]interface{}, 0, len(rowsInGroup))
for _, row := range rowsInGroup {
res, err := e.args[0].Eval(row)
if err != nil {
return errors.Trace(err)
}

result = append(result, res.GetValue())
}
p.array = append(p.array, result...)
return nil
}

type original4JsonArrayAgg struct {
baseJsonArrayAgg
}

type partial4JsonArrayAgg struct {
baseJsonArrayAgg
}

func (e *partial4JsonArrayAgg) MergePartialResult(sctx sessionctx.Context, src PartialResult, dst PartialResult) error {
p1, p2 := (*partialResult4JsonArrayAgg)(src), (*partialResult4JsonArrayAgg)(dst)
p2.array = append(p2.array, p1.array...)
return nil
}

type baseJsonObjectAgg struct {
baseAggFunc
}

type partialResult4JsonObjectAgg struct {
entries map[string]interface{}
}

func (e *baseJsonObjectAgg) AllocPartialResult() PartialResult {
res := partialResult4JsonObjectAgg{}
res.entries = make(map[string]interface{})
return PartialResult(&res)
}

func (e *baseJsonObjectAgg) ResetPartialResult(pr PartialResult) {
p := (*partialResult4JsonObjectAgg)(pr)
p.entries = make(map[string]interface{})
}

func (e *baseJsonObjectAgg) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4JsonObjectAgg)(pr)
if len(p.entries) == 0 {
chk.AppendNull(e.ordinal)
return nil
}

chk.AppendJSON(e.ordinal, json.CreateBinary(p.entries))
return nil
}

func (e *baseJsonObjectAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error {
p := (*partialResult4JsonObjectAgg)(pr)
for _, row := range rowsInGroup {
key, err := e.args[0].Eval(row)
if err != nil {
return errors.Trace(err)
}

var value types.Datum
value, err = e.args[1].Eval(row)
if err != nil {
return errors.Trace(err)
}

p.entries[key.GetString()] = value.GetValue()
}
return nil
}

type original4JsonObjectAgg struct {
baseJsonObjectAgg
}

type partial4JsonObjectAgg struct {
baseJsonObjectAgg
}

func (e *partial4JsonObjectAgg) MergePartialResult(sctx sessionctx.Context, src PartialResult, dst PartialResult) error {
p1, p2 := (*partialResult4JsonObjectAgg)(src), (*partialResult4JsonObjectAgg)(dst)
for k, v := range p1.entries {
p2.entries[k] = v
}
return nil
}
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (b *executorBuilder) wrapCastForAggArgs(funcs []*aggregation.AggFuncDesc) {
for _, f := range funcs {
// We do not need to wrap cast upon these functions,
// since the EvalXXX method called by the arg is determined by the corresponding arg type.
if f.Name == ast.AggFuncCount || f.Name == ast.AggFuncMin || f.Name == ast.AggFuncMax || f.Name == ast.AggFuncFirstRow {
if f.Name == ast.AggFuncCount || f.Name == ast.AggFuncMin || f.Name == ast.AggFuncMax || f.Name == ast.AggFuncFirstRow || f.Name == ast.AggFuncJsonArrayAgg || f.Name == ast.AggFuncJsonObjectAgg {
continue
}
var castFunc func(ctx sessionctx.Context, expr expression.Expression) expression.Expression
Expand Down
4 changes: 4 additions & 0 deletions expression/aggregation/agg_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func AggFuncToPBExpr(sc *stmtctx.StatementContext, client kv.Client, aggFunc *Ag
tp = tipb.ExprType_Agg_BitXor
case ast.AggFuncBitAnd:
tp = tipb.ExprType_Agg_BitAnd
case ast.AggFuncJsonArrayAgg:
tp = tipb.ExprType_JsonArrayAgg
case ast.AggFuncJsonObjectAgg:
tp = tipb.ExprType_JsonObjectAgg
}
if !client.IsRequestTypeSupported(kv.ReqTypeSelect, int64(tp)) {
return nil
Expand Down
4 changes: 4 additions & 0 deletions expression/aggregation/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func NewDistAggFunc(expr *tipb.Expr, fieldTps []*types.FieldType, sc *stmtctx.St
return &bitXorFunction{aggFunction: newAggFunc(ast.AggFuncBitXor, args, false)}, nil
case tipb.ExprType_Agg_BitAnd:
return &bitAndFunction{aggFunction: newAggFunc(ast.AggFuncBitAnd, args, false)}, nil
case tipb.ExprType_JsonArrayAgg:
return &jsonArrayAggFunction{newAggFunc(ast.AggFuncJsonArrayAgg, args, false)}, nil
case tipb.ExprType_JsonObjectAgg:
return &jsonObjectAggFunction{newAggFunc(ast.AggFuncJsonObjectAgg, args, false)}, nil
}
return nil, errors.Errorf("Unknown aggregate function type %v", expr.Tp)
}
Expand Down
11 changes: 11 additions & 0 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func (a *AggFuncDesc) typeInfer(ctx sessionctx.Context) {
a.typeInfer4MaxMin(ctx)
case ast.AggFuncBitAnd, ast.AggFuncBitOr, ast.AggFuncBitXor:
a.typeInfer4BitFuncs(ctx)
case ast.AggFuncJsonArrayAgg, ast.AggFuncJsonObjectAgg:
a.typeInfer4JsonFuncs(ctx)
default:
panic("unsupported agg function: " + a.Name)
}
Expand Down Expand Up @@ -276,6 +278,10 @@ func (a *AggFuncDesc) GetAggFunc(ctx sessionctx.Context) Aggregation {
return &bitXorFunction{aggFunction: aggFunc}
case ast.AggFuncBitAnd:
return &bitAndFunction{aggFunction: aggFunc}
case ast.AggFuncJsonArrayAgg:
return &jsonArrayAggFunction{aggFunction: aggFunc}
case ast.AggFuncJsonObjectAgg:
return &jsonObjectAggFunction{aggFunction: aggFunc}
default:
panic("unsupported agg function")
}
Expand Down Expand Up @@ -365,6 +371,11 @@ func (a *AggFuncDesc) typeInfer4BitFuncs(ctx sessionctx.Context) {
// TODO: a.Args[0] = expression.WrapWithCastAsInt(ctx, a.Args[0])
}

func (a *AggFuncDesc) typeInfer4JsonFuncs(ctx sessionctx.Context) {
a.RetTp = types.NewFieldType(mysql.TypeJSON)
types.SetBinChsClnFlag(a.RetTp)
}

func (a *AggFuncDesc) evalNullValueInOuterJoin4Count(ctx sessionctx.Context, schema *expression.Schema) (types.Datum, bool) {
for _, arg := range a.Args {
result := expression.EvaluateExprWithNull(ctx, schema, arg)
Expand Down
86 changes: 86 additions & 0 deletions expression/aggregation/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregation

import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pkg/errors"
)

type jsonArrayAggFunction struct {
aggFunction
}

// Update implements Aggregation interface.
func (sf *jsonArrayAggFunction) Update(evalCtx *AggEvaluateContext, sc *stmtctx.StatementContext, row chunk.Row) (err error) {
a := sf.Args[0]
value, err := a.Eval(row)
if err != nil {
return errors.Trace(err)
}
if value.IsNull() {
return nil
}

evalCtx.Value, err = ConcatBinaryJSONArray(evalCtx.Value, value)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetResult implements Aggregation interface.
func (sf *jsonArrayAggFunction) GetResult(evalCtx *AggEvaluateContext) (d types.Datum) {
d.SetValue(evalCtx.Value)
return
}

// GetPartialResult implements Aggregation interface.
func (sf *jsonArrayAggFunction) GetPartialResult(evalCtx *AggEvaluateContext) []types.Datum {
return []types.Datum{evalCtx.Value}
}

type jsonObjectAggFunction struct {
aggFunction
}

// Update implements Aggregation interface.
func (sf *jsonObjectAggFunction) Update(evalCtx *AggEvaluateContext, sc *stmtctx.StatementContext, row chunk.Row) (err error) {
a := sf.Args[0]
value, err := a.Eval(row)
if err != nil {
return errors.Trace(err)
}
if value.IsNull() {
return nil
}

evalCtx.Value, err = MergeBinaryJSONObject(evalCtx.Value, value)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetResult implements Aggregation interface.
func (sf *jsonObjectAggFunction) GetResult(evalCtx *AggEvaluateContext) (d types.Datum) {
d.SetValue(evalCtx.Value)
return
}

// GetPartialResult implements Aggregation interface.
func (sf *jsonObjectAggFunction) GetPartialResult(evalCtx *AggEvaluateContext) []types.Datum {
return []types.Datum{evalCtx.Value}
}
Loading