Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

分库分表:结果集处理——聚合函数(不含 Group By 子句) #179

Closed
Tracked by #182
flycash opened this issue Mar 27, 2023 · 7 comments
Closed
Tracked by #182
Labels

Comments

@flycash
Copy link
Contributor

flycash commented Mar 27, 2023

仅限中文

使用场景

聚合函数常见的有:

  • MIN:则是在所有的结果集里面挑出来最小的那个;
  • MAX:则是在所有的结果集里面挑出来最大的那个;
  • SUM:则是将所有的结果集的值加在一起;
  • COUNT:则是将所有的结果集的值加在一起;
  • AVG:平均值,平均值是一个比较与众不同的话题。例如说我们有一个逻辑上的查询语句 SELECT AVG(age) FROM xx,那么实际上生成的 SQL 语句应该是 SELECT SUM(AVG), COUNT(ID) FROM xxx。所以实际上结果集处理的时候,平均值应该是各个结果集的 SUM 加在一起,再除以 COUNT(ID)。

Screenshot 2023-03-27 at 5 52 16 PM

当然在这个例子里面,COUNT(ID) 只是一个例子,只是为了计算有多少条数据。

聚合函数和 GROUP BY

聚合函数在有 GROUP BY 和没有 GROUP BY 子句下是很不一样的。在有 GROUP BY 的场景下,聚合函数计算的是组内的聚合函数。
例如说 SELECT AVG(age) FROM user GROUP BY gender,这个 SQL 是计算按照性别划分的平均年龄,也就是说会有两个结果:男性平均年龄和女性平均年龄。

在我们社区,是没有政治正确这回事的

所以实际上在处理有 GROUP BY 子句的时候,比如说 SELECT AVG(age) FROM user GROUP BY gender 会变成:

  • SELECT SUM(age), COUNT(ID), gender FROM user GROUP BY gender

注意,这个时候在 SELECT 子句里面必须带上 GROUP BY 的列,也就是 gender,不然我们不知道如何处理。

那么假如说我们在两张表里面拿到四条数据:

  • tab_0: (sum=123, count = 3, gender = male), (sum=125, count = 2, gender = female)
  • tab_1: (sum=233, count = 5, gender = male), (sum=225, count = 7, gender = female)

那么实际上计算的结果是 <(123+233)/(3+5), male> 和 <(125+225)/(2+7), female>。

这里我们可以总结出来:

  • 在带 GROUP BY 子句的场景下,SELECT 出来的必须也带上 GROUP BY 中的列;
  • 在处理结果集的时候,需要手动进行 GROUP BY

第二个点,手动进行分组是一个很麻烦的点。它分成两种情况:

  • 如果 GROUP BY 的列,恰好就是 ORDER BY 的列,那么就可以采用 分库分表:merger 排序实现 #141 中的归并策略
  • 如果没有 ORDER BY 的列,那么我们必须将所有的数据都读过来,然后在内存里面排序

因此 GROUP BY 会带来很多额外的复杂度。因此在当前 issue 下,认为它必然是不带 GROUP BY 的,等后面我们再创建 issue 来支持带 GROUP BY 的。

多个聚合函数合并使用

很显然,用户可以在一个查询语句里面查询多个聚合函数的值,比如说 SELECT MIN(age), MAX(age) FROM xxx。那么我们也需要考虑支持这种场景。

设计

MIN、MAX、SUM和COUNT

这四个的实现非常简单额,每一个都是一个单独的列。在我们

其它

@juniaoshaonian
Copy link
Collaborator

设计

  1. 聚合函数的抽象
type Aggregator interface {
	// Aggregate 将多个列信息聚合,返回聚合后的数据。
	Aggregate(...[]any) (any, error)

	// Match 返回需要进行聚合函数计算的列的信息
	Match() []MatchInfo
}

type MatchInfo struct {
	name string
	typ  reflect.Type
}

  1. 整体逻辑是通过match返回的列信息,去sql.Rows里面拿数据。然后通过Aggragate方法完成Max,Min这些操作。
// rowsInfo存放sql.Rows列表中所有元素进行一次Next并且Scan后的数据,字段数据由[]byte接收
	rowsInfo := make([]*rowInfo, len(r.rowsList))
	for _, row := range r.rowsList {
		rowInfo, err := r.getRowsInfo(row)
		if err != nil {
			r.lastErr = err
			r.mu.Unlock()
			_ = r.Close()
			return false
		}
		rowsInfo = append(rowsInfo, rowInfo)
	}

	// 构建Column对象,通过聚合函数的match方法拿到匹配信息,拿到所有sqlRows列表中所有匹配的列,然后进行聚合函数的合并
	for _, aggregator := range r.aggregators {
		matchInfo := aggregator.Match()
		cols := make([][]any, 0, len(rowsInfo))
		for _, rowInfo := range rowsInfo {
			col := make([]any, 0, len(matchInfo))
			for _, colInfo := range matchInfo {
				ok, val := rowInfo.Get(colInfo.name)
				if !ok {
					r.lastErr = errs.ErrMergerAggregateColumnNotFound
					r.mu.Unlock()
					_ = r.Close()
					return false
				}
				newVal := reflect.New(colInfo.typ).Elem().Interface()
				convertAssign(&newVal, val)
				col = append(col, newVal)
			}
			cols = append(cols, col)
		}
		aggregator.Aggregate(cols...)
	}

  1. 具体聚合函数的实现,,以max为例
type Max[T aggregatemerger.AggregateElement] struct {
	matchInfo aggregatemerger.MatchInfo
}

func (m *Max[T]) Aggregate(columns ...[]any) (any, error) {
	ans := make([]T, 0, 1)
	for _, col := range columns {
		data, ok := col[0].(T)
		if !ok {
			return nil, errs.ErrMergerInvalidAggregateElement
		}
		if len(ans) == 0 {
			ans = append(ans, data)
		} else if ans[0] < data {
			ans[0] = data
		}
	}
	return ans[0], nil
}

func (m *Max[T]) Match() []aggregatemerger.MatchInfo {
	return []aggregatemerger.MatchInfo{m.matchInfo}
}

func NewMax[T aggregatemerger.AggregateElement](matchInfo aggregatemerger.MatchInfo) *Max[T] {
	return &Max[T]{
		matchInfo: matchInfo,
	}
}

type AggregateElement interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~float32 | ~float64
}

@juniaoshaonian
Copy link
Collaborator

还有一个问题,如果是avg我们Aggregator之后用什么类型记录直接使用float64吗

@flycash
Copy link
Contributor Author

flycash commented Mar 28, 2023

关于数字方面的问题,我的看法是=。=直接用 Decimal,一了百了

@flycash
Copy link
Contributor Author

flycash commented Mar 28, 2023

只有 AVG 用 decimal 吧。其它好像都不涉及精度之类的问题

@flycash
Copy link
Contributor Author

flycash commented Mar 29, 2023

这里我不是很明白 Match 方法和 MatchInfo 的意义。我倾向于认为,用户在初始化 Merger 的时候要告诉你所有的列对应的聚合函数是什么,比如说:

m := NewMerger(AggrInfoList)
type AggrInfo struct {
    Func // 标记是 MIN,MAX 还是啥
    name // 列名。比如说在 SELECT AVG(age) as avg_age 的时候应该传入 age。可以考虑在没有 AVG 的时候就直接使用第一个结果集的列名
    typ reflect.Type
}

这意味着 Merger 能够知道每一列的含义。这里要考虑两个点:在没有 GROUP BY 的时候,所有的列都是聚合函数,但是在有 GROUP BY 的时候,还可以是 GROUP BY 的列。
我计划在同一个包下面同时提供支持GROUP BY 和不支持 GROUP BY 的两种实现,所以这边设计的时候稍微考虑一下GROUP BY 能不能复用这里的结构。

Aggregator 的抽象我觉得是合适的。那么就近似于,遍历 AggreInfo,然后根据 Func 来调用对应的 Aggregator 实现来处理。但是有一个地方 Aggregator 的设计可能不太好用,就是 AVG。

基本逻辑是,如果你看到了FUNC 为 AVG,那么就要找到对应的 SUM 和 COUNT,然后计算一个平均值。这里会有问题,就是如果用户 SELECT AVG(age), COUNT(name),那么生成的实际SQL就得是 SELECT COUNT(ID), SUM(age), COUNT(name)。或者说你拿到的结果集就是这三个列。

但是用户在调用 Columns() 的时候,返回的应该是 AVG(age), COUNT(name);在进行 Next 或者 Scan 的时候应该也只有这两个。也就是说,NewMerger 传入的是预期的聚合函数,而结果集拿到的是分库分表之后实际的聚合函数。

只有 AVG 会有这种恶心的问题。那么进一步,在 AVG 的时候,用户还需要指定 COUNT 和 SUM 究竟在哪个列。比如说 AggreInfo { Func: AVG, name: avg_age, type: float64, countIndex: 2, sumIndex: 4}, 也就是说,计算这个 AVG 的COUNT 在 cols[2],SUM 在 cols[4]。在计算 AVG 的时候,我倾向于在发生 Scan 之前都是用 Decimal 来计算的,Scan 的时候再利用 ConvertAssign 来转类型。甚至于,如果为了简化代码,我认为所有的聚合函数都可以用 Decimal 来计算,然后在 Scan 的时候再转换为对应的类型。只不过因为别的聚合函数处理方式比较简单,所以不用 decimal 也可以。

从这里可以看到,AggreInfo 可能需要是接口,有三种实现:

  • 普通列:将来 GROUP BY 引入
  • AVG:万恶之源
  • 其它聚合函数

那么对应的 Aggregator 也不是很合适了。Aggregate(..any) 这种设计处理不了 AVG 的情况。

@juniaoshaonian
Copy link
Collaborator

type AggrInfo struct {
	AggregatorV1
	//  以Avg为例
	// colInfos = map[string]ColInfo{
	//		"Sum": 对应sum列的信息
	// 		“Count": 对应count的信息
	//}
//这样我在进行聚合函数计算的时候就可以通过,sum,count这些索引到对应的列信息,再通过列信息的名字,拿到我们的列数据
	colInfos map[string]ColInfo
}

type AggregatorV1 interface {
	// Aggregate 将多个列聚合
	// 第一个参数为聚合函数所需要的参数,第二个参数表示列名和其对应的值
	Aggregate(map[string]ColInfo, []map[string]any) (any, error)
}

type ColInfo struct {
	name string
	typ  reflect.Type
}

有一个问题我们从sql.Rows拿到对应的聚合函数的列名是怎么样的,以max(id)的形态在没有别名的情况下是id,还是说是Max(id)这种

@flycash
Copy link
Contributor Author

flycash commented Mar 29, 2023

是 Max(id) 这种。这里其实我觉得依赖于列名可能不是特别好。

我倾向于认为用顺序会比较好。也就是你在 []map[string]any 这里,不如就直接按照顺序传下去 [][]any。

之前我记得我们在排序里面用的也是下标而不是列名,所以这里可以考虑保持一致。不然的话就连排序那边也改过来用列名。

这个设计我觉得问题不大,不过在 AggreInfo 里面,我不建议这么组合,直接实现这个接口就可以。

你可以有 AggreInfo, AvgInfo, ColumnInfo 什么的

@flycash flycash changed the title 分库分表:结果集处理——聚合函数(不含 Group By 子句)(WIP) 分库分表:结果集处理——聚合函数(不含 Group By 子句) Mar 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants