From 4389ad52335934f805002792b0bffe5ca0b18f84 Mon Sep 17 00:00:00 2001 From: chen quan Date: Thu, 6 Jun 2024 21:24:40 +0800 Subject: [PATCH] feat(follower-db): allows watching for changes in the follower db (#53) * feat(follower-db): allows watching for changes in the follower db * x * test: fixed more tests * x * x --- multiple.go | 94 +++++++++++++++++++++++----------------- multiple_test.go | 69 +++++++++++++++++++++--------- opts.go | 38 +++++++++++++++++ picker.go | 109 ++++++++++++++++++++++++++++++----------------- 4 files changed, 212 insertions(+), 98 deletions(-) create mode 100644 opts.go diff --git a/multiple.go b/multiple.go index 35130fe..0c84f9e 100644 --- a/multiple.go +++ b/multiple.go @@ -19,8 +19,11 @@ package sqlx import ( "context" "database/sql" + "fmt" + "strconv" "strings" + "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" "github.com/zeromicro/go-zero/core/trace" "go.opentelemetry.io/otel/attribute" @@ -40,24 +43,24 @@ var ( var _ sqlx.SqlConn = (*multipleSqlConn)(nil) type ( - DBConf struct { - Leader string - Followers []string `json:",optional"` + FollowerDB struct { + Name string + Datasource string + Added bool } - SqlOption func(*sqlOptions) - - sqlOptions struct { - accept func(error) bool + DBConf struct { + Leader string + Followers []string `json:",optional"` + BackToOrigin bool `json:",optional"` } + multipleSqlConn struct { - leader sqlx.SqlConn - enableFollower bool - p2cPicker picker // picker - followers []sqlx.SqlConn - conf DBConf - accept func(error) bool - driveName string + leader sqlx.SqlConn + p2cPicker picker // picker + conf DBConf + driveName string + sqlOptions *sqlOptions } ) @@ -69,21 +72,35 @@ func NewMultipleSqlConn(driverName string, conf DBConf, opts ...SqlOption) sqlx. } leader := sqlx.NewSqlConn(driverName, conf.Leader, sqlx.WithAcceptable(sqlOpt.accept)) - followers := make([]sqlx.SqlConn, 0, len(conf.Followers)) - for _, datasource := range conf.Followers { - followers = append(followers, sqlx.NewSqlConn(driverName, datasource, sqlx.WithAcceptable(sqlOpt.accept))) - } conn := &multipleSqlConn{ - leader: leader, - enableFollower: len(followers) != 0, - followers: followers, - conf: conf, - driveName: driverName, - accept: sqlOpt.accept, + leader: leader, + conf: conf, + driveName: driverName, + sqlOptions: &sqlOpt, + } + + p2cPickerObj := newP2cPicker(driverName, sqlOpt.accept) + for i, datasource := range conf.Followers { + p2cPickerObj.add(strconv.Itoa(i), datasource) } + go func() { + if sqlOpt.watcher == nil { + return + } + + for follow := range sqlOpt.watcher { + logx.Infow("watcher", logx.Field("follow", follow)) + + if follow.Added { + p2cPickerObj.add(follow.Name, follow.Datasource) + } else { + p2cPickerObj.del(follow.Datasource) + } + } + }() - conn.p2cPicker = newP2cPicker(followers, conn.accept) + conn.p2cPicker = p2cPickerObj return conn } @@ -178,10 +195,6 @@ func (m *multipleSqlConn) getQueryDB(ctx context.Context, query string) queryDB return queryDB{conn: m.leader} } - if !m.enableFollower { - return queryDB{conn: m.leader} - } - if !m.containSelect(query) { return queryDB{conn: m.leader} } @@ -196,6 +209,10 @@ func (m *multipleSqlConn) getQueryDB(ctx context.Context, query string) queryDB } } + if !m.conf.BackToOrigin { + return queryDB{error: err} + } + return queryDB{conn: m.leader} } @@ -212,10 +229,10 @@ func (m *multipleSqlConn) startSpanWithLeader(ctx context.Context) (context.Cont return ctx, span } -func (m *multipleSqlConn) startSpanWithFollower(ctx context.Context, db int) (context.Context, oteltrace.Span) { +func (m *multipleSqlConn) startSpanWithFollower(ctx context.Context, dbName string) (context.Context, oteltrace.Span) { ctx, span := m.startSpan(ctx) span.SetAttributes(followerTypeAttributeKey) - span.SetAttributes(followerDBSqlAttributeKey.Int(db)) + span.SetAttributes(followerDBSqlAttributeKey.String(dbName)) return ctx, span } @@ -239,13 +256,14 @@ type queryDB struct { error error done func(err error) follower bool - followerDB int + followerDB string } func (q *queryDB) query(ctx context.Context, query func(ctx context.Context, conn sqlx.SqlConn) error) (err error) { if q.error != nil { return q.error } + defer func() { if q.done != nil { q.done(err) @@ -255,12 +273,6 @@ func (q *queryDB) query(ctx context.Context, query func(ctx context.Context, con return query(ctx, q.conn) } -func WithAccept(accept func(err error) bool) SqlOption { - return func(opts *sqlOptions) { - opts.accept = accept - } -} - type forceLeaderKey struct{} func ForceLeaderContext(ctx context.Context) context.Context { @@ -272,3 +284,9 @@ func forceLeaderFromContext(ctx context.Context) bool { _, ok := value.(struct{}) return ok } + +// --------------- + +func (f FollowerDB) String() string { + return fmt.Sprintf("FollowerDB{Name: %s, Datasource: %s, Added: %t}", f.Name, f.Datasource, f.Added) +} diff --git a/multiple_test.go b/multiple_test.go index ce1f7f7..28b9165 100644 --- a/multiple_test.go +++ b/multiple_test.go @@ -18,13 +18,12 @@ package sqlx import ( "context" - "database/sql/driver" + "fmt" "testing" "time" "github.com/DATA-DOG/go-sqlmock" "github.com/stretchr/testify/assert" - "github.com/zeromicro/go-zero/core/stores/sqlx" ) const mockedDatasource = "sqlmock" @@ -44,28 +43,24 @@ func TestNewMultipleSqlConn(t *testing.T) { Leader: leader, Followers: []string{follower1}, }) - - follower1Mock.ExpectExec("any") - follower1Mock.ExpectQuery("any").WillReturnRows(sqlmock.NewRows([]string{"foo"})) + rows := sqlmock.NewRows([]string{"name"}).AddRow("John Doe") + follower1Mock.ExpectQuery("SELECT name FROM users ").WithoutArgs().WillReturnRows(rows) var val string - assert.NotNil(t, mysql.QueryRow(&val, "any")) - assert.NotNil(t, mysql.QueryRow(&val, "any")) - assert.NotNil(t, mysql.QueryRowPartial(&val, "any")) - assert.NotNil(t, mysql.QueryRows(&val, "any")) - assert.NotNil(t, mysql.QueryRowsPartial(&val, "any")) - _, err = mysql.Prepare("any") - assert.NotNil(t, err) - assert.NotNil(t, mysql.Transact(func(session sqlx.Session) error { - return nil - })) - - leaderMock.ExpectExec("any").WillReturnResult(driver.RowsAffected(1)) - r, err := mysql.Exec("any") + assert.NoError(t, mysql.QueryRow(&val, "SELECT name FROM users ")) + fmt.Println(val) + + leaderMock.ExpectQuery("SELECT addr FROM users").WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"foo"}).AddRow("bar")) + assert.NoError(t, mysql.QueryRowCtx(ForceLeaderContext(context.Background()), &val, "SELECT addr FROM users")) + + leaderMock.ExpectExec("INSERT INTO users"). + WithArgs("john").WillReturnResult(sqlmock.NewResult(1, 1)) + + result, err := mysql.Exec(`INSERT INTO users(name) VALUES (?)`, "john") assert.NoError(t, err) - rowsAffected, err := r.RowsAffected() + insertId, err := result.LastInsertId() assert.NoError(t, err) - assert.Equal(t, int64(1), rowsAffected) + assert.EqualValues(t, 1, insertId) } func TestForceLeaderContext(t *testing.T) { @@ -74,3 +69,37 @@ func TestForceLeaderContext(t *testing.T) { assert.False(t, forceLeaderFromContext(context.Background())) } + +func TestWatch(t *testing.T) { + leader := "leader1" + follower1 := "follower1_1" + _, follower1Mock, err := sqlmock.NewWithDSN(follower1, sqlmock.MonitorPingsOption(true)) + assert.NoError(t, err) + _, leaderMock, err := sqlmock.NewWithDSN(leader, sqlmock.MonitorPingsOption(true)) + assert.NoError(t, err) + + follower1Mock.ExpectPing().WillDelayFor(time.Millisecond) + leaderMock.ExpectPing().WillDelayFor(time.Millisecond) + + dbsChan := make(chan FollowerDB, 1) + defer close(dbsChan) + + mysql := NewMultipleSqlConn(mockedDatasource, DBConf{ + Leader: leader, + Followers: []string{follower1}, + }, WithWatchFollowerDB(dbsChan)) + + var val string + follower1Mock.ExpectQuery("SELECT name FROM users "). + WithoutArgs(). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("John Doe")) + assert.NoError(t, mysql.QueryRow(&val, "SELECT name FROM users ")) + dbsChan <- FollowerDB{ + Name: "0", + Datasource: "", + Added: false, + } + time.Sleep(time.Second) + assert.Error(t, mysql.QueryRow(&val, "SELECT name FROM users ")) + +} diff --git a/opts.go b/opts.go new file mode 100644 index 0000000..bece90c --- /dev/null +++ b/opts.go @@ -0,0 +1,38 @@ +/* + * Copyright 2023 chenquan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sqlx + +type ( + SqlOption func(*sqlOptions) + + sqlOptions struct { + accept func(error) bool + watcher <-chan FollowerDB + } +) + +func WithAccept(accept func(err error) bool) SqlOption { + return func(opts *sqlOptions) { + opts.accept = accept + } +} + +func WithWatchFollowerDB(watcher <-chan FollowerDB) SqlOption { + return func(opts *sqlOptions) { + opts.watcher = watcher + } +} diff --git a/picker.go b/picker.go index 22ceeff..dd89692 100644 --- a/picker.go +++ b/picker.go @@ -53,57 +53,77 @@ type ( pickResult struct { conn sqlx.SqlConn done func(err error) - followerDB int + followerDB string } p2cPicker struct { - conns []*subConn - r *rand.Rand - stamp *syncx.AtomicDuration - lock sync.Mutex - accept func(err error) bool + driverName string + accept func(err error) bool + + r *rand.Rand + stamp *syncx.AtomicDuration + connsMap map[string]*followerConn + + lock sync.Mutex } ) -func newP2cPicker(followers []sqlx.SqlConn, accept func(err error) bool) *p2cPicker { - conns := make([]*subConn, 0, len(followers)) - for i, follower := range followers { - conns = append(conns, &subConn{ - success: initSuccess, - db: i, - conn: follower, - }) +func newP2cPicker(driverName string, accept func(err error) bool) *p2cPicker { + return &p2cPicker{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), + stamp: syncx.NewAtomicDuration(), + accept: accept, + connsMap: map[string]*followerConn{}, + driverName: driverName, } +} - return &p2cPicker{ - conns: conns, - r: rand.New(rand.NewSource(time.Now().UnixNano())), - stamp: syncx.NewAtomicDuration(), - accept: accept, +func (p *p2cPicker) del(name string) { + p.lock.Lock() + defer p.lock.Unlock() + p.connsMap[name] = nil + delete(p.connsMap, name) +} + +func (p *p2cPicker) add(name, dns string) { + p.lock.Lock() + defer p.lock.Unlock() + p.connsMap[name] = newSubConn(p.driverName, name, dns, p.accept) +} + +func (p *p2cPicker) getConns() []*followerConn { + conns := make([]*followerConn, 0, len(p.connsMap)) + for _, conn := range p.connsMap { + if conn != nil { + conns = append(conns, conn) + } } + + return conns } func (p *p2cPicker) pick() (*pickResult, error) { p.lock.Lock() defer p.lock.Unlock() + conns := p.getConns() - var chosen *subConn - switch len(p.conns) { + var chosen *followerConn + switch len(conns) { case 0: return nil, ErrNoFollowerAvailable case 1: - chosen = p.choose(p.conns[0], nil) + chosen = p.choose(conns[0], nil) case 2: - chosen = p.choose(p.conns[0], p.conns[1]) + chosen = p.choose(conns[0], conns[1]) default: - var node1, node2 *subConn + var node1, node2 *followerConn for i := 0; i < pickTimes; i++ { - a := p.r.Intn(len(p.conns)) - b := p.r.Intn(len(p.conns) - 1) + a := p.r.Intn(len(conns)) + b := p.r.Intn(len(conns) - 1) if b >= a { b++ } - node1 = p.conns[a] - node2 = p.conns[b] + node1 = conns[a] + node2 = conns[b] if node1.healthy() && node2.healthy() { break } @@ -118,11 +138,11 @@ func (p *p2cPicker) pick() (*pickResult, error) { return &pickResult{ conn: chosen.conn, done: p.buildDoneFunc(chosen), - followerDB: chosen.db, + followerDB: chosen.name, }, nil } -func (p *p2cPicker) buildDoneFunc(c *subConn) func(err error) { +func (p *p2cPicker) buildDoneFunc(c *followerConn) func(err error) { start := int64(timex.Now()) return func(err error) { // 正在处理的请求数减 1 @@ -168,7 +188,7 @@ func (p *p2cPicker) buildDoneFunc(c *subConn) func(err error) { } } -func (p *p2cPicker) choose(c1, c2 *subConn) *subConn { +func (p *p2cPicker) choose(c1, c2 *followerConn) *followerConn { start := int64(timex.Now()) if c2 == nil { atomic.StoreInt64(&c1.pick, start) @@ -191,31 +211,32 @@ func (p *p2cPicker) choose(c1, c2 *subConn) *subConn { func (p *p2cPicker) logStats() { p.lock.Lock() defer p.lock.Unlock() - stats := make([]string, 0, len(p.conns)) - for _, conn := range p.conns { - stats = append(stats, fmt.Sprintf("db: %d, load: %d, reqs: %d", - conn.db, conn.load(), atomic.SwapInt64(&conn.requests, 0))) + conns := p.getConns() + stats := make([]string, 0, len(conns)) + for _, conn := range conns { + stats = append(stats, fmt.Sprintf("db: %s, load: %d, reqs: %d", + conn.name, conn.load(), atomic.SwapInt64(&conn.requests, 0))) } logx.Statf("follower db - p2c - %s", strings.Join(stats, "; ")) } -type subConn struct { +type followerConn struct { lag uint64 // 用来保存 ewma 值(平均请求耗时) inflight int64 // 用在保存当前节点正在处理的请求总数 success uint64 // 用来标识一段时间内此连接的健康状态 requests int64 // 用来保存请求总数 last int64 // 用来保存上一次请求耗时, 用于计算 ewma 值 pick int64 // 保存上一次被选中的时间点 - db int + name string conn sqlx.SqlConn } -func (c *subConn) healthy() bool { +func (c *followerConn) healthy() bool { return atomic.LoadUint64(&c.success) > throttleSuccess } -func (c *subConn) load() int64 { +func (c *followerConn) load() int64 { // ewma 相当于平均请求耗时,inflight 是当前节点正在处理请求的数量,相乘大致计算出了当前节点的网络负载 // plus one to avoid multiply zero @@ -229,10 +250,18 @@ func (c *subConn) load() int64 { } func (p *p2cPicker) acceptable(err error) bool { - ok := err == nil || err == sql.ErrNoRows || err == sql.ErrTxDone || err == context.Canceled + ok := err == nil || errors.Is(err, sql.ErrNoRows) || errors.Is(err, sql.ErrTxDone) || errors.Is(err, context.Canceled) if p.accept == nil { return ok } return ok || p.accept(err) } + +func newSubConn(driverName, name, datasource string, acceptable func(err error) bool) *followerConn { + return &followerConn{ + success: initSuccess, + name: name, + conn: sqlx.NewSqlConn(driverName, datasource, sqlx.WithAcceptable(acceptable)), + } +}