From b22d20b69e26011fdd0fd9677b6a2f36448da343 Mon Sep 17 00:00:00 2001 From: zwl <73632785+juniaoshaonian@users.noreply.github.com> Date: Tue, 28 Feb 2023 18:59:44 +0800 Subject: [PATCH] =?UTF-8?q?slaves/dns:=20=E9=9B=86=E6=88=90=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E6=B7=BB=E5=8A=A0=20(#158)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/errs/error.go | 2 + internal/integration/base_test.go | 43 +++++++---- .../integration/delete_masterslave_test.go | 7 +- .../integration/insert_masterslave_test.go | 7 +- .../integration/select_masterslave_test.go | 77 ++++++++++++++++++- .../integration/update_masterslave_test.go | 7 +- internal/slaves/dns/dns.go | 26 +++++-- internal/slaves/dns/dns_test.go | 2 +- script/integrate_test.sh | 1 + script/mysql/master/init.sql | 0 script/mysql/slave/init.sql | 0 11 files changed, 141 insertions(+), 31 deletions(-) delete mode 100755 script/mysql/master/init.sql delete mode 100755 script/mysql/slave/init.sql diff --git a/internal/errs/error.go b/internal/errs/error.go index 2e33450..c045acd 100644 --- a/internal/errs/error.go +++ b/internal/errs/error.go @@ -38,6 +38,8 @@ var ( ErrNotFoundTargetDB = errors.New("eorm: 未发现目标 DB") ErrNotFoundTargetTable = errors.New("eorm: 未发现目标 Table") ErrSlaveNotFound = errors.New("eorm: slave不存在") + // ErrGetSlavesFromDNS 从dns获取slave列表失败 + ErrGetSlavesFromDNS = errors.New("eorm: 从DNS获取slaves失败") ) func NewFieldConflictError(field string) error { diff --git a/internal/integration/base_test.go b/internal/integration/base_test.go index 555819c..e3a6729 100644 --- a/internal/integration/base_test.go +++ b/internal/integration/base_test.go @@ -23,6 +23,8 @@ import ( "log" "time" + "github.com/ecodeclub/eorm/internal/slaves/dns" + "github.com/ecodeclub/eorm" "github.com/ecodeclub/eorm/internal/model" "github.com/ecodeclub/eorm/internal/slaves" @@ -115,10 +117,11 @@ func (s *ShardingSuite) SetupSuite() { type MasterSlaveSuite struct { suite.Suite - driver string - masterdsn string - slavedsns []string - orm *eorm.MasterSlavesDB + driver string + masterDsn string + slaveDsns []string + orm *eorm.MasterSlavesDB + initSlaves initSlaves *slaveNamegeter } @@ -132,19 +135,15 @@ func (s *MasterSlaveSuite) SetupSuite() { } func (s *MasterSlaveSuite) initDb() (*eorm.MasterSlavesDB, error) { - master, err := sql.Open(s.driver, s.masterdsn) + master, err := sql.Open(s.driver, s.masterDsn) if err != nil { return nil, err } - slaves := make([]*sql.DB, 0, len(s.slavedsns)) - for _, slavedsn := range s.slavedsns { - slave, err := sql.Open(s.driver, slavedsn) - if err != nil { - return nil, err - } - slaves = append(slaves, slave) + getter, err := s.initSlaves(s.driver, s.slaveDsns...) + if err != nil { + return nil, err } - s.slaveNamegeter = newSlaveNameGet(roundrobin.NewSlaves(slaves...)) + s.slaveNamegeter = newSlaveNameGet(getter) return eorm.OpenMasterSlaveDB(s.driver, master, eorm.MasterSlaveWithSlaves(s.slaveNamegeter)) } @@ -183,3 +182,21 @@ func (s *slaveNamegeter) Next(ctx context.Context) (slaves.Slave, error) { s.ch <- slave.SlaveName return slave, err } + +type initSlaves func(driver string, slaveDsns ...string) (slaves.Slaves, error) + +func newDnsSlaves(driver string, slaveDsns ...string) (slaves.Slaves, error) { + return dns.NewSlaves(slaveDsns[0]) +} + +func newRoundRobinSlaves(driver string, slaveDsns ...string) (slaves.Slaves, error) { + slaves := make([]*sql.DB, 0, len(slaveDsns)) + for _, slaveDsn := range slaveDsns { + slave, err := sql.Open(driver, slaveDsn) + if err != nil { + return nil, err + } + slaves = append(slaves, slave) + } + return roundrobin.NewSlaves(slaves...), nil +} diff --git a/internal/integration/delete_masterslave_test.go b/internal/integration/delete_masterslave_test.go index 58b08c8..5a9160a 100644 --- a/internal/integration/delete_masterslave_test.go +++ b/internal/integration/delete_masterslave_test.go @@ -84,9 +84,10 @@ func (s *DeleteMasterSlaveTestSuite) TestDeleter() { func TestMasterSlaveDelete(t *testing.T) { suite.Run(t, &DeleteMasterSlaveTestSuite{ MasterSlaveSuite: MasterSlaveSuite{ - driver: "mysql", - masterdsn: "root:root@tcp(localhost:13307)/integration_test", - slavedsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + driver: "mysql", + masterDsn: "root:root@tcp(localhost:13307)/integration_test", + slaveDsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + initSlaves: newRoundRobinSlaves, }, }) } diff --git a/internal/integration/insert_masterslave_test.go b/internal/integration/insert_masterslave_test.go index 36a7479..68af665 100644 --- a/internal/integration/insert_masterslave_test.go +++ b/internal/integration/insert_masterslave_test.go @@ -74,9 +74,10 @@ func (i *InsertMasterSlaveTestSuite) TestInsert() { func TestMasterSlaveInsert(t *testing.T) { suite.Run(t, &InsertMasterSlaveTestSuite{ MasterSlaveSuite: MasterSlaveSuite{ - driver: "mysql", - masterdsn: "root:root@tcp(localhost:13307)/integration_test", - slavedsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + driver: "mysql", + masterDsn: "root:root@tcp(localhost:13307)/integration_test", + slaveDsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + initSlaves: newRoundRobinSlaves, }, }) } diff --git a/internal/integration/select_masterslave_test.go b/internal/integration/select_masterslave_test.go index 5b50c63..1262ec4 100644 --- a/internal/integration/select_masterslave_test.go +++ b/internal/integration/select_masterslave_test.go @@ -104,9 +104,80 @@ func (s *MasterSlaveSelectTestSuite) TestMasterSlave() { func TestMasterSlaveSelect(t *testing.T) { suite.Run(t, &MasterSlaveSelectTestSuite{ MasterSlaveSuite: MasterSlaveSuite{ - driver: "mysql", - masterdsn: "root:root@tcp(localhost:13307)/integration_test", - slavedsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + driver: "mysql", + masterDsn: "root:root@tcp(localhost:13307)/integration_test", + slaveDsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + initSlaves: newRoundRobinSlaves, }, }) + suite.Run(t, &MasterSlaveDNSTestSuite{ + MasterSlaveSuite: MasterSlaveSuite{ + driver: "mysql", + masterDsn: "root:root@tcp(localhost:13307)/integration_test", + slaveDsns: []string{"root:root@tcp(slave.a.com:13308)/integration_test"}, + initSlaves: newDnsSlaves, + }, + }) +} + +type MasterSlaveDNSTestSuite struct { + MasterSlaveSuite + data []*test.SimpleStruct +} + +func (m *MasterSlaveDNSTestSuite) SetupSuite() { + m.MasterSlaveSuite.SetupSuite() + m.data = append(m.data, test.NewSimpleStruct(1)) + m.data = append(m.data, test.NewSimpleStruct(2)) + m.data = append(m.data, test.NewSimpleStruct(3)) + res := eorm.NewInserter[test.SimpleStruct](m.orm).Values(m.data...).Exec(context.Background()) + if res.Err() != nil { + m.T().Fatal(res.Err()) + m.T() + } +} +func (s *MasterSlaveDNSTestSuite) TearDownSuite() { + res := eorm.RawQuery[any](s.orm, "DELETE FROM `simple_struct`").Exec(context.Background()) + if res.Err() != nil { + s.T().Fatal(res.Err()) + } +} + +func (s *MasterSlaveDNSTestSuite) TestDNSMasterSlave() { + testcases := []struct { + name string + i *eorm.Selector[test.SimpleStruct] + wantErr error + wantRes []*test.SimpleStruct + wantSlave string + ctx func() context.Context + }{ + { + name: "get slave with dns", + i: eorm.NewSelector[test.SimpleStruct](s.orm).Where(eorm.C("Id").LT(4)), + wantSlave: "0", + wantRes: s.data, + ctx: func() context.Context { + return context.Background() + }, + }, + } + for _, tc := range testcases { + s.T().Run(tc.name, func(t *testing.T) { + ctx := tc.ctx() + time.Sleep(time.Second) + res, err := tc.i.GetMulti(ctx) + assert.Equal(t, tc.wantErr, err) + if err != nil { + return + } + assert.Equal(t, tc.wantRes, res) + slaveName := "" + select { + case slaveName = <-s.slaveNamegeter.ch: + default: + } + assert.Equal(t, tc.wantSlave, slaveName) + }) + } } diff --git a/internal/integration/update_masterslave_test.go b/internal/integration/update_masterslave_test.go index 3f87fcb..963de3c 100644 --- a/internal/integration/update_masterslave_test.go +++ b/internal/integration/update_masterslave_test.go @@ -85,9 +85,10 @@ func (u *UpdateMasterSlaveTestSuite) TestUpdate() { func TestMasterSlaveUpdate(t *testing.T) { suite.Run(t, &UpdateMasterSlaveTestSuite{ MasterSlaveSuite: MasterSlaveSuite{ - driver: "mysql", - masterdsn: "root:root@tcp(localhost:13307)/integration_test", - slavedsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + driver: "mysql", + masterDsn: "root:root@tcp(localhost:13307)/integration_test", + slaveDsns: []string{"root:root@tcp(localhost:13308)/integration_test"}, + initSlaves: newRoundRobinSlaves, }, }) } diff --git a/internal/slaves/dns/dns.go b/internal/slaves/dns/dns.go index feca43d..74c9f2f 100644 --- a/internal/slaves/dns/dns.go +++ b/internal/slaves/dns/dns.go @@ -17,6 +17,7 @@ package dns import ( "context" "database/sql" + "log" "net" "strconv" "sync" @@ -26,6 +27,7 @@ import ( "github.com/ecodeclub/eorm/internal/errs" "github.com/ecodeclub/eorm/internal/slaves" "github.com/ecodeclub/eorm/internal/slaves/dns/mysql" + _ "github.com/go-sql-driver/mysql" ) @@ -42,7 +44,7 @@ type Dsn interface { } type netResolver interface { - LookupAddr(ctx context.Context, domain string) ([]string, error) + LookupHost(ctx context.Context, domain string) ([]string, error) } var _ netResolver = (*net.Resolver)(nil) @@ -60,6 +62,7 @@ type Slaves struct { driver string interval time.Duration mu *sync.RWMutex + timeout time.Duration } func (s *Slaves) Next(ctx context.Context) (slaves.Slave, error) { @@ -92,6 +95,13 @@ func WithDriver(driver string) SlaveOption { } } +// WithTimeout 指定查询 DNS 服务器的超时时间 +func WithTimeout(timeout time.Duration) SlaveOption { + return func(s *Slaves) { + s.timeout = timeout + } +} + // WithInterval 指定轮询 DNS 服务器的间隔 func WithInterval(interval time.Duration) SlaveOption { return func(s *Slaves) { @@ -113,6 +123,7 @@ func NewSlaves(dsn string, opts ...SlaveOption) (*Slaves, error) { driver: "mysql", interval: time.Second, mu: &sync.RWMutex{}, + timeout: time.Second, } for _, opt := range opts { opt(s) @@ -122,7 +133,9 @@ func NewSlaves(dsn string, opts ...SlaveOption) (*Slaves, error) { return nil, err } s.domain = s.dsn.Domain() - err = s.getSlaves(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), s.timeout) + err = s.getSlaves(ctx) + cancel() if err != nil { return nil, err } @@ -131,9 +144,12 @@ func NewSlaves(dsn string, opts ...SlaveOption) (*Slaves, error) { for { select { case <-ticker.C: - err := s.getSlaves(context.Background()) - // 错误处理还没有想好怎么搞 + ctx, cancel := context.WithTimeout(context.Background(), s.timeout) + err := s.getSlaves(ctx) + cancel() + // 尽最大努力重试,拿到dns的响应 if err != nil { + log.Println(errs.ErrGetSlavesFromDNS) continue } case <-s.closeCh: @@ -145,7 +161,7 @@ func NewSlaves(dsn string, opts ...SlaveOption) (*Slaves, error) { } func (s *Slaves) getSlaves(ctx context.Context) error { - slavesip, err := s.resolver.LookupAddr(ctx, s.domain) + slavesip, err := s.resolver.LookupHost(ctx, s.domain) if err != nil { return err } diff --git a/internal/slaves/dns/dns_test.go b/internal/slaves/dns/dns_test.go index 8969078..b7aeebc 100644 --- a/internal/slaves/dns/dns_test.go +++ b/internal/slaves/dns/dns_test.go @@ -34,7 +34,7 @@ type mockResolver struct { m map[string][]string } -func (m *mockResolver) LookupAddr(ctx context.Context, domain string) ([]string, error) { +func (m *mockResolver) LookupHost(ctx context.Context, domain string) ([]string, error) { if ctx.Err() != nil { return []string{}, ctx.Err() } diff --git a/script/integrate_test.sh b/script/integrate_test.sh index 0c2a463..5c1fa40 100644 --- a/script/integrate_test.sh +++ b/script/integrate_test.sh @@ -2,5 +2,6 @@ docker compose -f script/integration_test_compose.yml down docker compose -f script/integration_test_compose.yml up -d +echo "127.0.0.1 slave.a.com" >> /etc/hosts go test -race ./... -tags=e2e docker compose -f script/integration_test_compose.yml down diff --git a/script/mysql/master/init.sql b/script/mysql/master/init.sql deleted file mode 100755 index e69de29..0000000 diff --git a/script/mysql/slave/init.sql b/script/mysql/slave/init.sql deleted file mode 100755 index e69de29..0000000