Skip to content

Commit

Permalink
分库分表:聚合函数含groupby字句 (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
juniaoshaonian authored Apr 24, 2023
1 parent c79a850 commit 400c80e
Show file tree
Hide file tree
Showing 23 changed files with 1,331 additions and 95 deletions.
1 change: 1 addition & 0 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- [eorm: 分库分表: 范围查询支持](https://github.com/ecodeclub/eorm/pull/178)
- [eorm: 分库分表: 结果集处理--聚合函数(不含GroupBy子句)](https://github.com/ecodeclub/eorm/pull/187)
- [eorm: 修复单条查询时连接泄露问题](https://github.com/ecodeclub/eorm/pull/188)
- [eorm: 分库分表: 结果集处理--聚合函数(含GroupBy子句)](https://github.com/ecodeclub/eorm/pull/193)
- [eorm: 分库分表: NOT 支持](https://github.com/ecodeclub/eorm/pull/191)

## v0.0.1:
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ require (
github.com/go-sql-driver/mysql v1.6.0
github.com/gotomicro/ekit v0.0.6
github.com/mattn/go-sqlite3 v1.14.15
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.8.1
github.com/valyala/bytebufferpool v1.0.0
go.uber.org/multierr v1.9.0
golang.org/x/sync v0.1.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 14 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gotomicro/ekit v0.0.6 h1:Tw3vcx8hltUzFmK7zkp6/5OGlE+ceuq6wha7KxBfpaA=
github.com/gotomicro/ekit v0.0.6/go.mod h1:LpstTheKiI/j532rejAlTwPRemwFQXhyqdH6lpzr4wk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
Expand All @@ -23,8 +34,9 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
8 changes: 5 additions & 3 deletions internal/merger/aggregatemerger/aggregator/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ package aggregator
import (
"reflect"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
)

// AVG 用于求平均值,通过sum/count求得。
// AVG 我们并不能预期在不同的数据库上,精度会不会损失,以及损失的话会有多少的损失。这很大程度上跟数据库类型,数据库驱动实现都有关
type AVG struct {
sumColumnInfo ColumnInfo
countColumnInfo ColumnInfo
sumColumnInfo merger.ColumnInfo
countColumnInfo merger.ColumnInfo
avgName string
}

// NewAVG sumInfo是sum的信息,countInfo是count的信息,avgName用于Column方法
func NewAVG(sumInfo ColumnInfo, countInfo ColumnInfo, avgName string) *AVG {
func NewAVG(sumInfo merger.ColumnInfo, countInfo merger.ColumnInfo, avgName string) *AVG {
return &AVG{
sumColumnInfo: sumInfo,
countColumnInfo: countInfo,
Expand Down
4 changes: 3 additions & 1 deletion internal/merger/aggregatemerger/aggregator/avg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package aggregator
import (
"testing"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestAvg_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
avg := NewAVG(NewColumnInfo(tc.index[0], "SUM(grade)"), NewColumnInfo(tc.index[1], "COUNT(grade)"), "AVG(grade)")
avg := NewAVG(merger.NewColumnInfo(tc.index[0], "SUM(grade)"), merger.NewColumnInfo(tc.index[1], "COUNT(grade)"), "AVG(grade)")
val, err := avg.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions internal/merger/aggregatemerger/aggregator/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package aggregator
import (
"reflect"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
)

type Count struct {
countInfo ColumnInfo
countInfo merger.ColumnInfo
}

func (s *Count) Aggregate(cols [][]any) (any, error) {
Expand Down Expand Up @@ -49,7 +51,7 @@ func (s *Count) ColumnName() string {
return s.countInfo.Name
}

func NewCount(info ColumnInfo) *Count {
func NewCount(info merger.ColumnInfo) *Count {
return &Count{
countInfo: info,
}
Expand Down
4 changes: 3 additions & 1 deletion internal/merger/aggregatemerger/aggregator/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package aggregator
import (
"testing"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -78,7 +80,7 @@ func TestCount_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
count := NewCount(NewColumnInfo(tc.countIndex, "COUNT(id)"))
count := NewCount(merger.NewColumnInfo(tc.countIndex, "COUNT(id)"))
val, err := count.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions internal/merger/aggregatemerger/aggregator/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package aggregator
import (
"reflect"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
)

type Max struct {
maxColumnInfo ColumnInfo
maxColumnInfo merger.ColumnInfo
}

func (m *Max) Aggregate(cols [][]any) (any, error) {
Expand Down Expand Up @@ -49,7 +51,7 @@ func (m *Max) ColumnName() string {
return m.maxColumnInfo.Name
}

func NewMax(info ColumnInfo) *Max {
func NewMax(info merger.ColumnInfo) *Max {
return &Max{
maxColumnInfo: info,
}
Expand Down
4 changes: 3 additions & 1 deletion internal/merger/aggregatemerger/aggregator/max_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package aggregator
import (
"testing"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestMax_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
max := NewMax(NewColumnInfo(tc.maxIndex, "MAX(id)"))
max := NewMax(merger.NewColumnInfo(tc.maxIndex, "MAX(id)"))
val, err := max.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions internal/merger/aggregatemerger/aggregator/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package aggregator
import (
"reflect"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
)

type Min struct {
minColumnInfo ColumnInfo
minColumnInfo merger.ColumnInfo
}

func (m *Min) Aggregate(cols [][]any) (any, error) {
Expand Down Expand Up @@ -50,7 +52,7 @@ func (m *Min) ColumnName() string {
return m.minColumnInfo.Name
}

func NewMin(info ColumnInfo) *Min {
func NewMin(info merger.ColumnInfo) *Min {
return &Min{
minColumnInfo: info,
}
Expand Down
4 changes: 3 additions & 1 deletion internal/merger/aggregatemerger/aggregator/min_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package aggregator
import (
"testing"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestMin_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
min := NewMin(NewColumnInfo(tc.minIndex, "MIN(id)"))
min := NewMin(merger.NewColumnInfo(tc.minIndex, "MIN(id)"))
val, err := min.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions internal/merger/aggregatemerger/aggregator/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package aggregator
import (
"reflect"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"
)

type Sum struct {
sumColumnInfo ColumnInfo
sumColumnInfo merger.ColumnInfo
}

func (s *Sum) Aggregate(cols [][]any) (any, error) {
Expand Down Expand Up @@ -50,7 +52,7 @@ func (s *Sum) ColumnName() string {
return s.sumColumnInfo.Name
}

func NewSum(info ColumnInfo) *Sum {
func NewSum(info merger.ColumnInfo) *Sum {
return &Sum{
sumColumnInfo: info,
}
Expand Down
4 changes: 3 additions & 1 deletion internal/merger/aggregatemerger/aggregator/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package aggregator
import (
"testing"

"github.com/ecodeclub/eorm/internal/merger"

"github.com/ecodeclub/eorm/internal/merger/internal/errs"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -79,7 +81,7 @@ func TestSum_Aggregate(t *testing.T) {
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
sum := NewSum(NewColumnInfo(tc.sumIndex, "SUM(id)"))
sum := NewSum(merger.NewColumnInfo(tc.sumIndex, "SUM(id)"))
val, err := sum.Aggregate(tc.input)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down
12 changes: 0 additions & 12 deletions internal/merger/aggregatemerger/aggregator/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,3 @@ type Aggregator interface {
// ColumnName 聚合函数的别名
ColumnName() string
}

type ColumnInfo struct {
Index int
Name string
}

func NewColumnInfo(index int, name string) ColumnInfo {
return ColumnInfo{
Index: index,
Name: name,
}
}
34 changes: 7 additions & 27 deletions internal/merger/aggregatemerger/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@ package aggregatemerger
import (
"context"
"database/sql"
"reflect"
"sync"
_ "unsafe"

"github.com/ecodeclub/eorm/internal/merger/utils"

"github.com/ecodeclub/eorm/internal/merger"
"github.com/ecodeclub/eorm/internal/merger/aggregatemerger/aggregator"
"github.com/ecodeclub/eorm/internal/merger/internal/errs"
"go.uber.org/multierr"
)

//go:linkname convertAssign database/sql.convertAssign
func convertAssign(dest, src any) error

// Merger 该实现不支持group by操作,并且聚合函数查询应该只返回一行数据。
type Merger struct {
aggregators []aggregator.Aggregator
Expand Down Expand Up @@ -151,32 +149,14 @@ func (r *Rows) getSqlRowsData() ([][]any, error) {
return rowsData, nil
}
func (r *Rows) getSqlRowData(row *sql.Rows) ([]any, error) {
colsInfo, err := row.ColumnTypes()
if err != nil {
return nil, err
}
// colsData 表示一个sql.Rows的数据
colsData := make([]any, 0, len(colsInfo))

var colsData []any
var err error
if row.Next() {
// 拿到sql.Rows字段的类型然后初始化
for _, colInfo := range colsInfo {
typ := colInfo.ScanType()
// sqlite3的驱动返回的是指针。循环的去除指针
for typ.Kind() == reflect.Pointer {
typ = typ.Elem()
}
newData := reflect.New(typ).Interface()
colsData = append(colsData, newData)
}
// 通过Scan赋值
err = row.Scan(colsData...)
colsData, err = utils.Scan(row)
if err != nil {
return nil, err
}
// 去掉reflect.New的指针
for i := 0; i < len(colsData); i++ {
colsData[i] = reflect.ValueOf(colsData[i]).Elem().Interface()
}
} else {
// sql.Rows迭代过程中发生报错,返回报错
if row.Err() != nil {
Expand All @@ -201,7 +181,7 @@ func (r *Rows) Scan(dest ...any) error {
return errs.ErrMergerScanNotNext
}
for i := 0; i < len(dest); i++ {
err := convertAssign(dest[i], r.cur[i])
err := utils.ConvertAssign(dest[i], r.cur[i])
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 400c80e

Please sign in to comment.