Skip to content

Commit

Permalink
eorm: ShardingInserter 修改为表维度执行
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Jun 10, 2023
1 parent 312a927 commit d918ba0
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 209 deletions.
1 change: 1 addition & 0 deletions .CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [eorm: 补充 NULL 语义和基本类型之间转化的测试用例](https://github.com/ecodeclub/eorm/pull/198)
- [eorm: 分库分表: ShardingSelector GetMulti 使用 merge](https://github.com/ecodeclub/eorm/pull/199)
- [eorm: 分库分表:Inserter 支持分库分表](https://github.com/ecodeclub/eorm/pull/200)
- [eorm: ShardingInserter 修改为表维度执行](https://github.com/ecodeclub/eorm/pull/211)
- [eorm: 分库分表:ShardingUpdater 实现](https://github.com/ecodeclub/eorm/pull/201)

## v0.0.1:
Expand Down
4 changes: 1 addition & 3 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"context"
"database/sql"

"github.com/ecodeclub/eorm/internal/datasource"

"github.com/ecodeclub/eorm/internal/errs"
"github.com/ecodeclub/eorm/internal/model"
"github.com/ecodeclub/eorm/internal/query"
Expand Down Expand Up @@ -74,7 +72,7 @@ func newQuerier[T any](sess Session, q Query, meta *model.TableMeta, typ string)
// Exec 执行 SQL
func (q Querier[T]) Exec(ctx context.Context) Result {
var handler HandleFunc = func(ctx context.Context, qc *QueryContext) *QueryResult {
res, err := q.Session.execContext(ctx, datasource.Query(qc.q))
res, err := q.Session.execContext(ctx, qc.q)
return &QueryResult{Result: res, Err: err}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/ecodeclub/ekit v0.0.4-0.20230511055406-b4540a020e0c
github.com/ecodeclub/ekit v0.0.4-0.20230530053225-e671c5fdd2d1
github.com/go-sql-driver/mysql v1.6.0
github.com/gotomicro/ekit v0.0.0-20230224040531-869798da3c4d
github.com/mattn/go-sqlite3 v1.14.15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/ecodeclub/ekit v0.0.4-0.20230511055406-b4540a020e0c h1:DlQL/N9QFYdWpLJSsw/7QRUTU3gGrJfR9c+X8wakPjA=
github.com/ecodeclub/ekit v0.0.4-0.20230511055406-b4540a020e0c/go.mod h1:q/cMifDy7CygsCz9NZNgFS6lksEo5tWxsb7RjMoZv00=
github.com/ecodeclub/ekit v0.0.4-0.20230530053225-e671c5fdd2d1 h1:a1Dbg0zZOQPfG3pgFqZjkQM2ty1ZABewjzRK970OQ8w=
github.com/ecodeclub/ekit v0.0.4-0.20230530053225-e671c5fdd2d1/go.mod h1:OqTojKeKFTxeeAAUwNIPKu339SRkX6KAuoK/8A5BCEs=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gotomicro/ekit v0.0.0-20230224040531-869798da3c4d h1:kmDgYRZ06UifBqAfew+cj02juQQ3Ko349NzsDIZ0QPw=
Expand Down
6 changes: 3 additions & 3 deletions internal/datasource/masterslave/master_slave_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"database/sql"
"fmt"

slaves2 "github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves"
"github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves"
"go.uber.org/multierr"

"github.com/ecodeclub/eorm/internal/datasource"
Expand All @@ -31,7 +31,7 @@ var _ datasource.DataSource = &MasterSlavesDB{}

type MasterSlavesDB struct {
master *sql.DB
slaves slaves2.Slaves
slaves slaves.Slaves
}

type key string
Expand Down Expand Up @@ -90,7 +90,7 @@ func (m *MasterSlavesDB) Close() error {

type MasterSlavesDBOption func(db *MasterSlavesDB)

func MasterSlavesWithSlaves(s slaves2.Slaves) MasterSlavesDBOption {
func MasterSlavesWithSlaves(s slaves.Slaves) MasterSlavesDBOption {
return func(db *MasterSlavesDB) {
db.slaves = s
}
Expand Down
113 changes: 11 additions & 102 deletions internal/integration/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,16 @@ package integration
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"log"
"time"

"github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves/roundrobin"

"github.com/ecodeclub/eorm/internal/datasource/masterslave"
slaves2 "github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves"
"github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves"
"github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves/dns"

"github.com/ecodeclub/eorm/internal/datasource/shardingsource"
"github.com/ecodeclub/eorm/internal/datasource/masterslave/slaves/roundrobin"
"github.com/stretchr/testify/require"

"github.com/ecodeclub/eorm/internal/datasource/single"

"github.com/stretchr/testify/require"

"github.com/ecodeclub/eorm"
"github.com/ecodeclub/eorm/internal/datasource"
"github.com/ecodeclub/eorm/internal/datasource/cluster"
"github.com/ecodeclub/eorm/internal/sharding"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -79,72 +68,6 @@ type masterSalvesDriver struct {
slavedsns []string
}

type ShardingSuite struct {
suite.Suite
slaves slaves2.Slaves
clusters *clusterDrivers
shardingDB *eorm.DB
algorithm sharding.Algorithm
dataSources map[string]datasource.DataSource
driver string
DBPattern string
DsPattern string
}

func (s *ShardingSuite) openDB(dvr, dsn string) (*sql.DB, error) {
db, err := sql.Open(dvr, dsn)
err = db.Ping()
for err == driver.ErrBadConn {
log.Printf("等待数据库启动...")
err = db.Ping()
time.Sleep(time.Second)
}
return db, err
}

func (s *ShardingSuite) initDB() (*eorm.DB, error) {
clDrivers := s.clusters.clDrivers
sourceMap := make(map[string]datasource.DataSource, len(clDrivers))
for i, clus := range clDrivers {
msMap := make(map[string]*masterslave.MasterSlavesDB, 8)
for j, d := range clus.msDrivers {
master, err := s.openDB(s.driver, d.masterdsn)
if err != nil {
return nil, err
}
ss := make([]*sql.DB, 0, len(d.slavedsns))
for _, slavedsn := range d.slavedsns {
slave, err := s.openDB(s.driver, slavedsn)
if err != nil {
return nil, err
}
ss = append(ss, slave)
}
sl, err := roundrobin.NewSlaves(ss...)
require.NoError(s.T(), err)
s.slaves = &testBaseSlaves{Slaves: sl}
masterSlaveDB := masterslave.NewMasterSlavesDB(
master, masterslave.MasterSlavesWithSlaves(s.slaves))
dbName := fmt.Sprintf(s.DBPattern, j)
msMap[dbName] = masterSlaveDB
}
sourceName := fmt.Sprintf(s.DsPattern, i)
sourceMap[sourceName] = cluster.NewClusterDB(msMap)
}
s.dataSources = sourceMap
dataSource := shardingsource.NewShardingDataSource(sourceMap)
return eorm.OpenDS(s.driver, dataSource)
}

func (s *ShardingSuite) SetupSuite() {
t := s.T()
db, err := s.initDB()
if err != nil {
t.Fatal(err)
}
s.shardingDB = db
}

type MasterSlaveSuite struct {
suite.Suite
driver string
Expand All @@ -158,9 +81,7 @@ type MasterSlaveSuite struct {
func (s *MasterSlaveSuite) SetupSuite() {
t := s.T()
orm, err := s.initDb()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
s.orm = orm
}

Expand All @@ -178,23 +99,11 @@ func (s *MasterSlaveSuite) initDb() (*eorm.DB, error) {

}

//type baseSlaveNamegeter struct {
// slaves.Slaves
//}
//
//func (s *baseSlaveNamegeter) Next(ctx context.Context) (slaves.Slave, error) {
// slave, err := s.Slaves.Next(ctx)
// if err != nil {
// return slave, err
// }
// return slave, err
//}

type testBaseSlaves struct {
slaves2.Slaves
slaves.Slaves
}

func (s *testBaseSlaves) Next(ctx context.Context) (slaves2.Slave, error) {
func (s *testBaseSlaves) Next(ctx context.Context) (slaves.Slave, error) {
slave, err := s.Slaves.Next(ctx)
if err != nil {
return slave, err
Expand All @@ -207,7 +116,7 @@ type testSlaves struct {
ch chan string
}

func newTestSlaves(s slaves2.Slaves) *testSlaves {
func newTestSlaves(s slaves.Slaves) *testSlaves {
return &testSlaves{
testBaseSlaves: &testBaseSlaves{
Slaves: s,
Expand All @@ -216,7 +125,7 @@ func newTestSlaves(s slaves2.Slaves) *testSlaves {
}
}

func (s *testSlaves) Next(ctx context.Context) (slaves2.Slave, error) {
func (s *testSlaves) Next(ctx context.Context) (slaves.Slave, error) {
slave, err := s.Slaves.Next(ctx)
if err != nil {
return slave, err
Expand All @@ -225,13 +134,13 @@ func (s *testSlaves) Next(ctx context.Context) (slaves2.Slave, error) {
return slave, err
}

type initSlaves func(driver string, slaveDsns ...string) (slaves2.Slaves, error)
type initSlaves func(driver string, slaveDsns ...string) (slaves.Slaves, error)

func newDnsSlaves(driver string, slaveDsns ...string) (slaves2.Slaves, error) {
func newDnsSlaves(driver string, slaveDsns ...string) (slaves.Slaves, error) {
return dns.NewSlaves(slaveDsns[0])
}

func newRoundRobinSlaves(driver string, slaveDsns ...string) (slaves2.Slaves, error) {
func newRoundRobinSlaves(driver string, slaveDsns ...string) (slaves.Slaves, error) {
ss := make([]*sql.DB, 0, len(slaveDsns))
for _, slaveDsn := range slaveDsns {
slave, err := sql.Open(driver, slaveDsn)
Expand Down
27 changes: 14 additions & 13 deletions internal/integration/select_masterslave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func (s *MasterSlaveSelectTestSuite) SetupSuite() {
if res.Err() != nil {
s.T().Fatal(res.Err())
}
// 避免主从延迟
time.Sleep(time.Second * 10)
}

func (s *MasterSlaveSelectTestSuite) TearDownSuite() {
res := eorm.RawQuery[any](s.orm, "DELETE FROM `simple_struct`").Exec(context.Background())
res := eorm.RawQuery[any](s.orm, "TRUNCATE TABLE `simple_struct`").Exec(context.Background())
if res.Err() != nil {
s.T().Fatal(res.Err())
}
Expand All @@ -67,8 +69,7 @@ func (s *MasterSlaveSelectTestSuite) TestMasterSlave() {
wantRes: s.data,
ctx: func() context.Context {
c := context.Background()
c = masterslave.UseMaster(c)
return c
return masterslave.UseMaster(c)
},
},
// TODO 从库测试目前有查不到数据的bug
Expand All @@ -85,7 +86,6 @@ func (s *MasterSlaveSelectTestSuite) TestMasterSlave() {
for _, tc := range testcases {
s.T().Run(tc.name, func(t *testing.T) {
ctx := tc.ctx()
time.Sleep(time.Second)
res, err := tc.i.GetMulti(ctx)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand Down Expand Up @@ -135,15 +135,17 @@ func (m *MasterSlaveDNSTestSuite) SetupSuite() {
if res.Err() != nil {
m.T().Fatal(res.Err())
}
// 避免主从延迟
time.Sleep(time.Second * 10)
}
func (s *MasterSlaveDNSTestSuite) TearDownSuite() {
res := eorm.RawQuery[any](s.orm, "DELETE FROM `simple_struct`").Exec(context.Background())
func (m *MasterSlaveDNSTestSuite) TearDownSuite() {
res := eorm.RawQuery[any](m.orm, "TRUNCATE TABLE `simple_struct`").Exec(context.Background())
if res.Err() != nil {
s.T().Fatal(res.Err())
m.T().Fatal(res.Err())
}
}

func (s *MasterSlaveDNSTestSuite) TestDNSMasterSlave() {
func (m *MasterSlaveDNSTestSuite) TestDNSMasterSlave() {
testcases := []struct {
name string
i *eorm.Selector[test.SimpleStruct]
Expand All @@ -155,18 +157,17 @@ func (s *MasterSlaveDNSTestSuite) TestDNSMasterSlave() {
// TODO 从库测试目前有查不到数据的bug
{
name: "get slave with dns",
i: eorm.NewSelector[test.SimpleStruct](s.orm).Where(eorm.C("Id").LT(4)),
i: eorm.NewSelector[test.SimpleStruct](m.orm).Where(eorm.C("Id").LT(4)),
wantSlave: "0",
wantRes: s.data,
wantRes: m.data,
ctx: func() context.Context {
return context.Background()
},
},
}
for _, tc := range testcases {
s.T().Run(tc.name, func(t *testing.T) {
m.T().Run(tc.name, func(t *testing.T) {
ctx := tc.ctx()
time.Sleep(time.Second)
res, err := tc.i.GetMulti(ctx)
assert.Equal(t, tc.wantErr, err)
if err != nil {
Expand All @@ -175,7 +176,7 @@ func (s *MasterSlaveDNSTestSuite) TestDNSMasterSlave() {
assert.Equal(t, tc.wantRes, res)
slaveName := ""
select {
case slaveName = <-s.testSlaves.ch:
case slaveName = <-m.testSlaves.ch:
default:
}
assert.Equal(t, tc.wantSlave, slaveName)
Expand Down
8 changes: 4 additions & 4 deletions internal/integration/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ type SelectTestSuite struct {
func (s *SelectTestSuite) SetupSuite() {
s.Suite.SetupSuite()
s.data = test.NewSimpleStruct(1)
s.data.Int32Ptr = nil
res := eorm.NewInserter[test.SimpleStruct](s.orm).Values(s.data).Exec(context.Background())
if res.Err() != nil {
s.T().Fatal(res.Err())
}
}

func (s *SelectTestSuite) TearDownSuite() {
res := eorm.RawQuery[any](s.orm, "DELETE FROM `simple_struct`").Exec(context.Background())
res := eorm.RawQuery[any](s.orm, "TRUNCATE TABLE `simple_struct`").Exec(context.Background())
if res.Err() != nil {
s.T().Fatal(res.Err())
}
Expand Down Expand Up @@ -204,7 +205,6 @@ func (s *SelectTestSuite) TestSelectorGetBaseType() {
return &res
}(),
},
// TODO 测试出问题
{
name: "res *int accept NULL",
queryRes: func() (any, error) {
Expand All @@ -220,12 +220,12 @@ func (s *SelectTestSuite) TestSelectorGetBaseType() {
{
name: "res int accept NULL",
queryRes: func() (any, error) {
queryer := eorm.NewSelector[*int](s.orm).Select(eorm.C("Int32Ptr")).
queryer := eorm.NewSelector[int](s.orm).Select(eorm.C("Int32Ptr")).
From(eorm.TableOf(&test.SimpleStruct{}, "t1")).
Where(eorm.C("Id").EQ(1))
return queryer.Get(context.Background())
},
wantErr: "abc",
wantErr: "sql: Scan error on column index 0, name \"int32_ptr\": converting NULL to int is unsupported",
},
}

Expand Down
Loading

0 comments on commit d918ba0

Please sign in to comment.