-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature add sharding selector #145
Conversation
Codecov Report
@@ Coverage Diff @@
## dev #145 +/- ##
==========================================
- Coverage 84.35% 78.63% -5.73%
==========================================
Files 24 25 +1
Lines 1687 2045 +358
==========================================
+ Hits 1423 1608 +185
- Misses 226 377 +151
- Partials 38 60 +22
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
sharding 内部使用的放进去 internal 里面,如果是暴露给用户的接口,那么可以直接放在顶级目录下面,也可以在顶级目录下面创建一个 sharding 包; 分开其实也可以,只不过分开之后的设计可能会很像,因为理论上来说,任何可以被用来分库的算法,也可以用来分表。这个你随意,我不介意; Get 那里你先直接粗暴判断,后面我们再来重构 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
思路是没什么问题的,你继续把 Build 写下去,然后补充单元测试。单元测试先试试 Build 方法就行。
sharding selector 与 selector 存在大量的重复方法,这个是否需要共同的实现一个接口,以及 添加一个基础的抽象 如 BaseSelector, 不过泛型的话这里会很丑 |
可以将一部分数据结构下沉到 internal 里面,而后 ShardingSelector 和普通的 Selector 共同依赖于这部分 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
全部注释什么的用中文。之前我用英文是因为那时候脑子进水了
Codecov Report
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more @@ Coverage Diff @@
## dev #145 +/- ##
==========================================
- Coverage 85.30% 79.95% -5.36%
==========================================
Files 26 28 +2
Lines 1797 2265 +468
==========================================
+ Hits 1533 1811 +278
- Misses 219 376 +157
- Partials 45 78 +33
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我先合并了,它还很多 BUG,但是这算是我的 锅,我应该拆分得更加细一点。不过说实在的,我也很难拆了=。=
return orm, nil | ||
} | ||
|
||
func ShardingDBOptionWithMetaRegistry(r model.MetaRegistry) ShardingDBOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不需要 Option 这个中缀,其它也是
|
||
type ShardingDBOption func(db *ShardingDB) | ||
|
||
func CtxWithDBName(ctx context.Context, dbName string) context.Context { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个暂时不要暴露出去。后面我们是要设计强制走某个库,某个表,到时候再说。其实我是很反对用户自己强制指定的,要么是他们代码有问题,要么是他们设计有问题
val := ctx.Value(dbNameKey{}) | ||
dbName, ok := val.(string) | ||
if !ok { | ||
return "", errs.ErrCtxGetDBName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ErrDBNotSpecified
return nil, err | ||
} | ||
res = append(res, query) | ||
s.args = make([]any, 0, 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看起来有点奇怪,因为对应的 buffer 也是需要重置的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
把它和 buffer 重置放一起,然后定义一个新的方法 reset()
if len(s.where) > 0 { | ||
s.writeString(" WHERE ") | ||
p := s.where[0] | ||
for i := 1; i < len(s.where); i++ { | ||
p = p.And(s.where[i]) | ||
} | ||
if err = s.buildExpr(p); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
// group by | ||
if len(s.groupBy) > 0 { | ||
err = s.buildGroupBy() | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
// order by | ||
if len(s.orderBy) > 0 { | ||
err = s.buildOrderBy() | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
// having | ||
if len(s.having) > 0 { | ||
s.writeString(" HAVING ") | ||
p := s.having[0] | ||
for i := 1; i < len(s.having); i++ { | ||
p = p.And(s.having[i]) | ||
} | ||
if err = s.buildExpr(p); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
if s.offset > 0 { | ||
s.writeString(" OFFSET ") | ||
s.parameter(s.offset) | ||
} | ||
|
||
if s.limit > 0 { | ||
s.writeString(" LIMIT ") | ||
s.parameter(s.limit) | ||
} | ||
s.end() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
之前我说过引入一个 selectorBuilder。然后把 Selector 和 ShardingSelector 共用的东西丢进去
return sess, nil | ||
} | ||
|
||
func (s *ShardingDB) broadcast() []Dst { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
所以在广播的时候你会出现问题,就是比如说我这个数据库虽然分库分表了,但是我其实 Order 表和 OrderDetail 表都在这个库上,那么你这里的广播就会导致,原本我只查询 Order 表的,结果你会广播过去 OrderDetail。这也就是我说,应该定义在分库分表规则上的
}, | ||
{ | ||
name: "miss sharding key err", | ||
builder: func() ShardingQueryBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
你这个不是已经传入了 UserId 吗?
"order_tab_3": true, | ||
})) | ||
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB).Where(C("UserId").EQ(int64(123))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个 Selector 和前面一个测试用例不是一样的吗?
}, ShardingDBOptionWithMetaRegistry(r), | ||
ShardingDBOptionWithTables(map[string]bool{ | ||
"order_tab_3": true, | ||
})) | ||
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB). | ||
Select(Columns("Content", "OrderId")).Where(C("UserId").EQ(int64(123))) | ||
return s | ||
}(), | ||
qs: []*ShardingQuery{ | ||
{ | ||
SQL: "SELECT `content`,`order_id` FROM `order_db_1`.`order_tab_3` WHERE `user_id`=?;", | ||
Args: []any{int64(123)}, | ||
DB: "order_db_1", | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "invalid columns", | ||
builder: func() ShardingQueryBuilder { | ||
shardingDB, err := OpenShardingDB("sqlite3", map[string]*MasterSlavesDB{ | ||
"order_db_1": masterSlaveDB, | ||
}, ShardingDBOptionWithMetaRegistry(r), | ||
ShardingDBOptionWithTables(map[string]bool{ | ||
"order_tab_3": true, | ||
})) | ||
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB). | ||
Select(C("Invalid")).Where(C("UserId").EQ(int64(123))) | ||
return s | ||
}(), | ||
wantErr: errs.NewInvalidFieldError("Invalid"), | ||
}, | ||
{ | ||
name: "order by", | ||
builder: func() ShardingQueryBuilder { | ||
shardingDB, err := OpenShardingDB("sqlite3", map[string]*MasterSlavesDB{ | ||
"order_db_1": masterSlaveDB, | ||
}, ShardingDBOptionWithMetaRegistry(r), | ||
ShardingDBOptionWithTables(map[string]bool{ | ||
"order_tab_3": true, | ||
})) | ||
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB).Select(C("OrderId"), C("Content")). | ||
Where(C("UserId").EQ(int64(123))).OrderBy(ASC("UserId"), DESC("OrderId")) | ||
return s | ||
}(), | ||
qs: []*ShardingQuery{ | ||
{ | ||
SQL: "SELECT `order_id`,`content` FROM `order_db_1`.`order_tab_3` WHERE `user_id`=? ORDER BY `user_id` ASC,`order_id` DESC;", | ||
Args: []any{int64(123)}, | ||
DB: "order_db_1", | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "group by", | ||
builder: func() ShardingQueryBuilder { | ||
shardingDB, err := OpenShardingDB("sqlite3", map[string]*MasterSlavesDB{ | ||
"order_db_1": masterSlaveDB, | ||
}, ShardingDBOptionWithMetaRegistry(r), | ||
ShardingDBOptionWithTables(map[string]bool{ | ||
"order_tab_3": true, | ||
})) | ||
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB).Select(C("OrderId"), C("Content")). | ||
Where(C("UserId").EQ(int64(123))).GroupBy("UserId", "OrderId") | ||
return s | ||
}(), | ||
qs: []*ShardingQuery{ | ||
{ | ||
SQL: "SELECT `order_id`,`content` FROM `order_db_1`.`order_tab_3` WHERE `user_id`=? GROUP BY `user_id`,`order_id`;", | ||
Args: []any{int64(123)}, | ||
DB: "order_db_1", | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "having", | ||
builder: func() ShardingQueryBuilder { | ||
shardingDB, err := OpenShardingDB("sqlite3", map[string]*MasterSlavesDB{ | ||
"order_db_1": masterSlaveDB, | ||
}, ShardingDBOptionWithMetaRegistry(r), | ||
ShardingDBOptionWithTables(map[string]bool{ | ||
"order_tab_3": true, | ||
})) | ||
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB). | ||
Select(C("OrderId"), C("Content")). | ||
Where(C("UserId").EQ(int64(123))).GroupBy("OrderId").Having(C("OrderId").EQ(int64(18))) | ||
return s | ||
}(), | ||
qs: []*ShardingQuery{ | ||
{ | ||
SQL: "SELECT `order_id`,`content` FROM `order_db_1`.`order_tab_3` WHERE `user_id`=? GROUP BY `order_id` HAVING `order_id`=?;", | ||
Args: []any{int64(123), int64(18)}, | ||
DB: "order_db_1", | ||
}, | ||
}, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这几个不是测试 WHERE 部分写法的,挪出去单独作为一个测试,因为它们不太重要=。=后面等我支持聚合函数的时候再考虑弄回来
require.NoError(t, err) | ||
s := NewShardingSelector[Order](shardingDB). | ||
Select(C("OrderId"), C("Content")). | ||
Where(C("OrderId").EQ(int64(12)).And(C("UserId").EQ(int64(123)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我在 opEQ 那里评价太苛刻就是类似这种用法,例如 C("OrderId").Eq(Raw("now()"))
这是初步草稿思路: