Skip to content

Commit

Permalink
integration: 调整主从的集成测试
Browse files Browse the repository at this point in the history
  • Loading branch information
flycash committed Mar 2, 2023
1 parent 2b3a9b3 commit 8634707
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 244 deletions.
9 changes: 5 additions & 4 deletions internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ var (
ErrNotFoundTargetDB = errors.New("eorm: 未发现目标 DB")
ErrNotFoundTargetTable = errors.New("eorm: 未发现目标 Table")
ErrSlaveNotFound = errors.New("eorm: slave不存在")
// ErrGetSlavesFromDNS 从dns获取slave列表失败
ErrGetSlavesFromDNS = errors.New("eorm: 从DNS获取slaves失败")
ErrMergerEmptyRows = errors.New("eorm: sql.Rows列表为空")
ErrMergerRowsIsNull = errors.New("eorm: sql.Rows列表中有元素为nil")
ErrMergerEmptyRows = errors.New("eorm: sql.Rows列表为空")
ErrMergerRowsIsNull = errors.New("eorm: sql.Rows列表中有元素为nil")
)

func NewFieldConflictError(field string) error {
Expand Down Expand Up @@ -90,5 +88,8 @@ func NewUnsupportedOperatorError(op string) error {

func NewInvalidDSNError(dsn string) error {
return fmt.Errorf("eorm: 不正确的 DSN %s", dsn)
}

func NewFailedToGetSlavesFromDNS(err error) error {
return fmt.Errorf("eorm: 从DNS中解析从库失败 %w", err)
}
67 changes: 33 additions & 34 deletions internal/integration/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
"github.com/stretchr/testify/require"
"log"
"time"

Expand Down Expand Up @@ -58,7 +59,7 @@ type masterSalvesDriver struct {

type ShardingSuite struct {
suite.Suite
*baseSlaveNamegeter
slaves slaves.Slaves
driver string
tbSet map[string]bool
driverMap map[string]*masterSalvesDriver
Expand All @@ -85,19 +86,19 @@ func (s *ShardingSuite) initDB() (*eorm.ShardingDB, error) {
if err != nil {
return nil, err
}
slaves := make([]*sql.DB, 0, len(v.slavedsns))
ss := make([]*sql.DB, 0, len(v.slavedsns))
for _, slavedsn := range v.slavedsns {
slave, err := s.openDB(s.driver, slavedsn)
if err != nil {
return nil, err
}
slaves = append(slaves, slave)
}
s.baseSlaveNamegeter = &baseSlaveNamegeter{
Slaves: roundrobin.NewSlaves(slaves...),
ss = append(ss, slave)
}
sl, err := roundrobin.NewSlaves(ss...)
require.NoError(s.T(), err)
s.slaves = newTestSlaves(sl)
masterSlaveDB, err := eorm.OpenMasterSlaveDB(
s.driver, master, eorm.MasterSlaveWithSlaves(s.baseSlaveNamegeter))
s.driver, master, eorm.MasterSlaveWithSlaves(s.slaves))
if err != nil {
return nil, err
}
Expand All @@ -122,7 +123,7 @@ type MasterSlaveSuite struct {
slaveDsns []string
orm *eorm.MasterSlavesDB
initSlaves initSlaves
*slaveNamegeter
*testSlaves
}

func (s *MasterSlaveSuite) SetupSuite() {
Expand All @@ -139,42 +140,40 @@ func (s *MasterSlaveSuite) initDb() (*eorm.MasterSlavesDB, error) {
if err != nil {
return nil, err
}
getter, err := s.initSlaves(s.driver, s.slaveDsns...)
ss, err := s.initSlaves(s.driver, s.slaveDsns...)
if err != nil {
return nil, err
}
s.slaveNamegeter = newSlaveNameGet(getter)
return eorm.OpenMasterSlaveDB(s.driver, master, eorm.MasterSlaveWithSlaves(s.slaveNamegeter))
s.testSlaves = newTestSlaves(ss)
return eorm.OpenMasterSlaveDB(s.driver, master, eorm.MasterSlaveWithSlaves(s.testSlaves))

}

type baseSlaveNamegeter struct {
//type baseSlaveNamegeter struct {
// slaves.Slaves
//}
//
//func (s *baseSlaveNamegeter) Next(ctx context.Context) (slaves.Slave, error) {
// slave, err := s.Slaves.Next(ctx)
// if err != nil {
// return slave, err
// }
// return slave, err
//}

type testSlaves struct {
slaves.Slaves
}

func (s *baseSlaveNamegeter) Next(ctx context.Context) (slaves.Slave, error) {
slave, err := s.Slaves.Next(ctx)
if err != nil {
return slave, err
}
return slave, err
}

type slaveNamegeter struct {
*baseSlaveNamegeter
ch chan string
}

func newSlaveNameGet(geter slaves.Slaves) *slaveNamegeter {
return &slaveNamegeter{
baseSlaveNamegeter: &baseSlaveNamegeter{
Slaves: geter,
},
ch: make(chan string, 1),
func newTestSlaves(s slaves.Slaves) *testSlaves {
return &testSlaves{
Slaves: s,
ch: make(chan string, 1),
}
}

func (s *slaveNamegeter) Next(ctx context.Context) (slaves.Slave, error) {
func (s *testSlaves) Next(ctx context.Context) (slaves.Slave, error) {
slave, err := s.Slaves.Next(ctx)
if err != nil {
return slave, err
Expand All @@ -190,13 +189,13 @@ func newDnsSlaves(driver string, slaveDsns ...string) (slaves.Slaves, error) {
}

func newRoundRobinSlaves(driver string, slaveDsns ...string) (slaves.Slaves, error) {
slaves := make([]*sql.DB, 0, len(slaveDsns))
ss := make([]*sql.DB, 0, len(slaveDsns))
for _, slaveDsn := range slaveDsns {
slave, err := sql.Open(driver, slaveDsn)
if err != nil {
return nil, err
}
slaves = append(slaves, slave)
ss = append(ss, slave)
}
return roundrobin.NewSlaves(slaves...), nil
return roundrobin.NewSlaves(ss...)
}
2 changes: 1 addition & 1 deletion internal/integration/delete_masterslave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *DeleteMasterSlaveTestSuite) TestDeleter() {
affected, err := res.RowsAffected()
slaveName := ""
select {
case slaveName = <-s.slaveNamegeter.ch:
case slaveName = <-s.testSlaves.ch:
default:
}
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/insert_masterslave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (i *InsertMasterSlaveTestSuite) TestInsert() {
}
slaveName := ""
select {
case slaveName = <-i.slaveNamegeter.ch:
case slaveName = <-i.testSlaves.ch:
default:
}
affected, err := res.RowsAffected()
Expand Down
31 changes: 20 additions & 11 deletions internal/integration/select_masterslave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ type MasterSlaveSelectTestSuite struct {
data []*test.SimpleStruct
}

func (m *MasterSlaveSelectTestSuite) SetupSuite() {
m.MasterSlaveSuite.SetupSuite()
m.data = append(m.data, test.NewSimpleStruct(1))
m.data = append(m.data, test.NewSimpleStruct(2))
m.data = append(m.data, test.NewSimpleStruct(3))
res := eorm.NewInserter[test.SimpleStruct](m.orm).Values(m.data...).Exec(context.Background())
func (s *MasterSlaveSelectTestSuite) SetupSuite() {
s.MasterSlaveSuite.SetupSuite()
s.data = append(s.data, test.NewSimpleStruct(1))
s.data = append(s.data, test.NewSimpleStruct(2))
s.data = append(s.data, test.NewSimpleStruct(3))
res := eorm.NewInserter[test.SimpleStruct](s.orm).Values(s.data...).Exec(context.Background())
if res.Err() != nil {
m.T().Fatal(res.Err())
m.T()
s.T().Fatal(res.Err())
}
}

Expand Down Expand Up @@ -93,7 +92,7 @@ func (s *MasterSlaveSelectTestSuite) TestMasterSlave() {
assert.Equal(t, tc.wantRes, res)
slaveName := ""
select {
case slaveName = <-s.slaveNamegeter.ch:
case slaveName = <-s.testSlaves.ch:
default:
}
assert.Equal(t, tc.wantSlave, slaveName)
Expand Down Expand Up @@ -133,7 +132,6 @@ func (m *MasterSlaveDNSTestSuite) SetupSuite() {
res := eorm.NewInserter[test.SimpleStruct](m.orm).Values(m.data...).Exec(context.Background())
if res.Err() != nil {
m.T().Fatal(res.Err())
m.T()
}
}
func (s *MasterSlaveDNSTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -174,10 +172,21 @@ func (s *MasterSlaveDNSTestSuite) TestDNSMasterSlave() {
assert.Equal(t, tc.wantRes, res)
slaveName := ""
select {
case slaveName = <-s.slaveNamegeter.ch:
case slaveName = <-s.testSlaves.ch:
default:
}
assert.Equal(t, tc.wantSlave, slaveName)
})
}
}

type Hash struct {
// 有占位符就是要分集群,没有就不分
DatasoucePattern string
// 有占位符就分库,没有就不分
DatabasePattern string
// 有占位符就分表,没有就不分
TablePattern string

Base int
}
2 changes: 1 addition & 1 deletion internal/integration/sharding_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *ShardingSelectTestSuite) SetupSuite() {
t.Fatal(res.Err())
}
}
// TODO 防止®️主从延迟
// 防止主从延迟
time.Sleep(1)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/integration/update_masterslave_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (u *UpdateMasterSlaveTestSuite) TestUpdate() {
}
slaveName := ""
select {
case slaveName = <-u.slaveNamegeter.ch:
case slaveName = <-u.testSlaves.ch:
default:
}
affected, err := res.RowsAffected()
Expand Down
9 changes: 4 additions & 5 deletions internal/slaves/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Slaves struct {
once sync.Once
driver string
interval time.Duration
mu *sync.RWMutex
mu sync.RWMutex
timeout time.Duration
}

Expand Down Expand Up @@ -122,7 +122,6 @@ func NewSlaves(dsn string, opts ...SlaveOption) (*Slaves, error) {
resolver: net.DefaultResolver,
driver: "mysql",
interval: time.Second,
mu: &sync.RWMutex{},
timeout: time.Second,
}
for _, opt := range opts {
Expand All @@ -144,12 +143,12 @@ func NewSlaves(dsn string, opts ...SlaveOption) (*Slaves, error) {
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
err := s.getSlaves(ctx)
ctx, cancel = context.WithTimeout(context.Background(), s.timeout)
err = s.getSlaves(ctx)
cancel()
// 尽最大努力重试,拿到dns的响应
if err != nil {
log.Println(errs.ErrGetSlavesFromDNS)
log.Println(errs.NewFailedToGetSlavesFromDNS(err))
continue
}
case <-s.closeCh:
Expand Down
15 changes: 6 additions & 9 deletions internal/slaves/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,23 @@ func (r *Slaves) Next(ctx context.Context) (slaves.Slave, error) {
if ctx.Err() != nil {
return slaves.Slave{}, ctx.Err()
}
if r == nil {
return slaves.Slave{}, errs.ErrSlaveNotFound
}
if len(r.slaves) == 0 {
if r == nil || len(r.slaves) == 0 {
return slaves.Slave{}, errs.ErrSlaveNotFound
}
cnt := atomic.AddUint32(&r.cnt, 1)
index := int(cnt) % len(r.slaves)
return r.slaves[index], nil
}

func NewSlaves(slavedbs ...*sql.DB) *Slaves {
func NewSlaves(dbs ...*sql.DB) (*Slaves, error) {
r := &Slaves{}
r.slaves = make([]slaves.Slave, 0, len(slavedbs))
for idx, slavedb := range slavedbs {
r.slaves = make([]slaves.Slave, 0, len(dbs))
for idx, db := range dbs {
s := slaves.Slave{
SlaveName: strconv.Itoa(idx),
DB: slavedb,
DB: db,
}
r.slaves = append(r.slaves, s)
}
return r
return r, nil
}
Loading

0 comments on commit 8634707

Please sign in to comment.