Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 分库分表: NOT 支持 #192

Merged
merged 7 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
- [eorm: 分库分表: Merger抽象与批量查询实现](https://github.com/ecodeclub/eorm/pull/160)
- [eorm: 增强的 ShardingAlgorithm 设计与实现](https://github.com/ecodeclub/eorm/pull/161)
- [eorm: 分库分表: Merger排序实现](https://github.com/ecodeclub/eorm/pull/166)
- [eorm: 分库分表: hash、shadow_hash算法不符合预期](https://github.com/ecodeclub/eorm/pull/174)
- [eorm: 分库分表: Merger分页实现](https://github.com/ecodeclub/eorm/pull/175)
- [eorm: BasicTypeValue重命名](https://github.com/ecodeclub/eorm/pull/177)
- [eorm: Datasource 抽象](https://github.com/ecodeclub/eorm/pull/167)
- [eorm: 分库分表: hash、shadow_hash算法不符合预期](https://github.com/ecodeclub/eorm/pull/174)
- [eorm: 分库分表: Merger分页实现](https://github.com/ecodeclub/eorm/pull/175)
- [eorm: 分库分表: 结果集处理--聚合函数(不含GroupBy子句)](https://github.com/ecodeclub/eorm/pull/187)
- [eorm: BasicTypeValue重命名](https://github.com/ecodeclub/eorm/pull/177)
- [eorm: 分库分表: 范围查询支持](https://github.com/ecodeclub/eorm/pull/178)
- [eorm: 分库分表: 结果集处理--聚合函数(不含GroupBy子句)](https://github.com/ecodeclub/eorm/pull/187)
- [eorm: 修复单条查询时连接泄露问题](https://github.com/ecodeclub/eorm/pull/188)
- [eorm: 分库分表: NOT 支持](https://github.com/ecodeclub/eorm/pull/191)

## v0.0.1:
- [Init Project](https://github.com/ecodeclub/eorm/pull/1)
Expand Down
138 changes: 138 additions & 0 deletions internal/integration/sharding_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,144 @@ func (s *ShardingSelectTestSuite) TestSardingSelectorGetMulti() {
{OrderId: 253, ItemId: 8, UsingCol1: "Stephen", UsingCol2: "Curry"},
},
},
{
name: "where not in",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.C("OrderId").NotIn(8, 123, 253))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 234, ItemId: 12, UsingCol1: "Kevin", UsingCol2: "Durant"},
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
},
},
{
name: "where not in and eq",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.C("OrderId").NotIn(8, 123, 253).
And(eorm.C("OrderId").EQ(11)))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
},
},
{
name: "where not in or eq",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.C("OrderId").NotIn(8, 123, 253).
Or(eorm.C("OrderId").EQ(11)))

return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 234, ItemId: 12, UsingCol1: "Kevin", UsingCol2: "Durant"},
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
},
},
{
name: "where not in or gt",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.C("OrderId").NotIn(8, 123, 253).
Or(eorm.C("OrderId").GT(11)))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 123, ItemId: 10, UsingCol1: "LeBron", UsingCol2: "James"},
{OrderId: 234, ItemId: 12, UsingCol1: "Kevin", UsingCol2: "Durant"},
{OrderId: 253, ItemId: 8, UsingCol1: "Stephen", UsingCol2: "Curry"},
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
},
},
{
name: "where not gt",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.Not(eorm.C("OrderId").GT(181)))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 8, ItemId: 6, UsingCol1: "Kobe", UsingCol2: "Bryant"},
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 123, ItemId: 10, UsingCol1: "LeBron", UsingCol2: "James"},
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
},
},
{
name: "where not lt",
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.Not(eorm.C("OrderId").LT(181)))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
{OrderId: 234, ItemId: 12, UsingCol1: "Kevin", UsingCol2: "Durant"},
{OrderId: 253, ItemId: 8, UsingCol1: "Stephen", UsingCol2: "Curry"},
},
},
{
name: "where not (gt and lt)",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.Not(eorm.C("OrderId").GT(11).
And(eorm.C("OrderId").LT(230))))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 8, ItemId: 6, UsingCol1: "Kobe", UsingCol2: "Bryant"},
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 234, ItemId: 12, UsingCol1: "Kevin", UsingCol2: "Durant"},
{OrderId: 253, ItemId: 8, UsingCol1: "Stephen", UsingCol2: "Curry"},
},
},
{
name: "where not (gt eq and lt eq)",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.Not(eorm.C("OrderId").GTEQ(11).
And(eorm.C("OrderId").LTEQ(240))))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 8, ItemId: 6, UsingCol1: "Kobe", UsingCol2: "Bryant"},
{OrderId: 253, ItemId: 8, UsingCol1: "Stephen", UsingCol2: "Curry"},
},
},
{
name: "where not (in or gt)",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.Not(eorm.C("OrderId").In(8, 123, 253).
Or(eorm.C("OrderId").GT(200))))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
},
},
{
name: "where not (in or eq)",
s: func() *eorm.ShardingSelector[test.OrderDetail] {
builder := eorm.NewShardingSelector[test.OrderDetail](s.shardingDB).
Where(eorm.Not(eorm.C("OrderId").In(8, 123).
Or(eorm.C("OrderId").EQ(253))))
return builder
}(),
wantRes: []*test.OrderDetail{
{OrderId: 11, ItemId: 8, UsingCol1: "James", UsingCol2: "Harden"},
{OrderId: 181, ItemId: 11, UsingCol1: "Kawhi", UsingCol2: "Leonard"},
{OrderId: 234, ItemId: 12, UsingCol1: "Kevin", UsingCol2: "Durant"},
},
},
}

for _, tc := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion internal/sharding/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (h *Hash) Sharding(ctx context.Context, req sharding.Request) (sharding.Res
return sharding.Result{
Dsts: []sharding.Dst{{Name: dsName, DB: dbName, Table: tbName}},
}, nil
case operator.OpGT, operator.OpLT, operator.OpGTEQ, operator.OpLTEQ:
case operator.OpGT, operator.OpLT, operator.OpGTEQ, operator.OpLTEQ, operator.OpNEQ:
return sharding.Result{Dsts: h.Broadcast(ctx)}, nil
default:
return sharding.EmptyResult, errs.NewUnsupportedOperatorError(req.Op.Text)
Expand Down
2 changes: 2 additions & 0 deletions predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
// Predicate will be used in Where Or Having
type Predicate binaryExpr

var EmptyPredicate = Predicate{}

func (Predicate) expr() (string, error) {
return "", nil
}
Expand Down
87 changes: 86 additions & 1 deletion sharding_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,26 @@ func (s *ShardingSelector[T]) findDstByPredicate(ctx context.Context, pre Predic
results = append(results, res)
}
return s.mergeIN(results), nil
case opEQ, opGT, opLT, opGTEQ, opLTEQ:
case opNotIN:
col := pre.left.(Column)
right := pre.right.(values)
var results []sharding.Result
for _, val := range right.data {
res, err := s.meta.ShardingAlgorithm.Sharding(ctx,
sharding.Request{Op: opNEQ, SkValues: map[string]any{col.name: val}})
if err != nil {
return sharding.EmptyResult, err
}
results = append(results, res)
}
return s.mergeNotIN(results), nil
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
case opNot:
nPre, err := s.negatePredicate(pre.right.(Predicate))
if err != nil {
return sharding.EmptyResult, err
}
return s.findDstByPredicate(ctx, nPre)
case opEQ, opGT, opLT, opGTEQ, opLTEQ, opNEQ:
col, isCol := pre.left.(Column)
right, isVals := pre.right.(valueExpr)
if !isCol || !isVals {
Expand All @@ -206,6 +225,61 @@ func (s *ShardingSelector[T]) findDstByPredicate(ctx context.Context, pre Predic
}
}

func (s *ShardingSelector[T]) negatePredicate(pre Predicate) (Predicate, error) {
switch pre.op {
case opAnd:
left, err := s.negatePredicate(pre.left.(Predicate))
if err != nil {
return EmptyPredicate, err
}
right, err := s.negatePredicate(pre.right.(Predicate))
if err != nil {
return EmptyPredicate, err
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
}
return Predicate{
left: left, op: opOr, right: right,
}, nil
case opOr:
Stone-afk marked this conversation as resolved.
Show resolved Hide resolved
left, err := s.negatePredicate(pre.left.(Predicate))
if err != nil {
return EmptyPredicate, err
}
right, err := s.negatePredicate(pre.right.(Predicate))
if err != nil {
return EmptyPredicate, err
}
return Predicate{
left: left, op: opAnd, right: right,
}, nil
case opEQ:
return Predicate{
left: pre.left, op: opNEQ, right: pre.right,
}, nil
case opIn:
return Predicate{
left: pre.left, op: opNotIN, right: pre.right,
}, nil
case opGT:
return Predicate{
left: pre.left, op: opLTEQ, right: pre.right,
}, nil
case opLT:
return Predicate{
left: pre.left, op: opGTEQ, right: pre.right,
}, nil
case opGTEQ:
return Predicate{
left: pre.left, op: opLT, right: pre.right,
}, nil
case opLTEQ:
return Predicate{
left: pre.left, op: opGT, right: pre.right,
}, nil
default:
return EmptyPredicate, errs.NewUnsupportedOperatorError(pre.op.Text)
}
}

// mergeAnd 两个分片结果的交集
func (*ShardingSelector[T]) mergeAnd(left, right sharding.Result) sharding.Result {
dsts := slice.IntersectSetFunc[sharding.Dst](left.Dsts, right.Dsts, func(src, dst sharding.Dst) bool {
Expand Down Expand Up @@ -233,6 +307,17 @@ func (*ShardingSelector[T]) mergeIN(vals []sharding.Result) sharding.Result {
return sharding.Result{Dsts: dsts}
}

// mergeNotIN 多个分片结果的交集
func (*ShardingSelector[T]) mergeNotIN(vals []sharding.Result) sharding.Result {
dsts := vals[0].Dsts
for i := 0; i < len(vals); i++ {
dsts = slice.IntersectSetFunc[sharding.Dst](dsts, vals[i].Dsts, func(src, dst sharding.Dst) bool {
return src.Equals(dst)
})
}
return sharding.Result{Dsts: dsts}
}

func (s *ShardingSelector[T]) buildAllColumns() error {
for i, cMeta := range s.meta.Columns {
_ = s.buildColumns(i, cMeta.FieldName)
Expand Down
Loading