Skip to content

Commit ea02175

Browse files
author
Divjot Arora
authored
GODRIVER-1901 Add details to wait queue timeout errors (#612)
1 parent 801470d commit ea02175

File tree

4 files changed

+188
-15
lines changed

4 files changed

+188
-15
lines changed

x/mongo/driver/topology/connection.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,8 @@ func (c initConnection) SupportsStreaming() bool {
605605
// messages and the driver.Expirable interface to allow expiring.
606606
type Connection struct {
607607
*connection
608-
refCount int
608+
refCount int
609+
cleanupPoolFn func()
609610

610611
mu sync.RWMutex
611612
}
@@ -687,9 +688,7 @@ func (c *Connection) Close() error {
687688
return nil
688689
}
689690

690-
err := c.pool.put(c.connection)
691-
c.connection = nil
692-
return err
691+
return c.cleanupReferences()
693692
}
694693

695694
// Expire closes this connection and will closeConnection the underlying socket.
@@ -701,7 +700,15 @@ func (c *Connection) Expire() error {
701700
}
702701

703702
_ = c.close()
703+
return c.cleanupReferences()
704+
}
705+
706+
func (c *Connection) cleanupReferences() error {
704707
err := c.pool.put(c.connection)
708+
if c.cleanupPoolFn != nil {
709+
c.cleanupPoolFn()
710+
c.cleanupPoolFn = nil
711+
}
705712
c.connection = nil
706713
return err
707714
}
@@ -750,21 +757,27 @@ func (c *Connection) LocalAddress() address.Address {
750757

751758
// PinToCursor updates this connection to reflect that it is pinned to a cursor.
752759
func (c *Connection) PinToCursor() error {
753-
return c.pin("cursor")
760+
return c.pin("cursor", c.pool.pinConnectionToCursor, c.pool.unpinConnectionFromCursor)
754761
}
755762

756763
// PinToTransaction updates this connection to reflect that it is pinned to a transaction.
757764
func (c *Connection) PinToTransaction() error {
758-
return c.pin("transaction")
765+
return c.pin("transaction", c.pool.pinConnectionToTransaction, c.pool.unpinConnectionFromTransaction)
759766
}
760767

761-
func (c *Connection) pin(reason string) error {
768+
func (c *Connection) pin(reason string, updatePoolFn, cleanupPoolFn func()) error {
762769
c.mu.Lock()
763770
defer c.mu.Unlock()
764771
if c.connection == nil {
765772
return fmt.Errorf("attempted to pin a connection for a %s, but the connection has already been returned to the pool", reason)
766773
}
767774

775+
// Only use the provided callbacks for the first reference to avoid double-counting pinned connection statistics
776+
// in the pool.
777+
if c.refCount == 0 {
778+
updatePoolFn()
779+
c.cleanupPoolFn = cleanupPoolFn
780+
}
768781
c.refCount++
769782
return nil
770783
}

x/mongo/driver/topology/connection_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,134 @@ func TestConnection(t *testing.T) {
834834
t.Errorf("LocalAddresses do not match. got %v; want %v", got, want)
835835
}
836836
})
837+
838+
t.Run("pinning", func(t *testing.T) {
839+
makeMultipleConnections := func(t *testing.T, numConns int) (*pool, []*Connection) {
840+
t.Helper()
841+
842+
addr := address.Address("")
843+
pool, err := newPool(poolConfig{Address: addr})
844+
assert.Nil(t, err, "newPool error: %v", err)
845+
846+
err = pool.sem.Acquire(context.Background(), int64(numConns))
847+
assert.Nil(t, err, "error acquiring semaphore: %v", err)
848+
849+
conns := make([]*Connection, 0, numConns)
850+
for i := 0; i < numConns; i++ {
851+
conn, err := newConnection(addr)
852+
assert.Nil(t, err, "newConnection error: %v", err)
853+
conn.pool = pool
854+
conns = append(conns, &Connection{connection: conn})
855+
}
856+
return pool, conns
857+
}
858+
makeOneConnection := func(t *testing.T) (*pool, *Connection) {
859+
t.Helper()
860+
861+
pool, conns := makeMultipleConnections(t, 1)
862+
return pool, conns[0]
863+
}
864+
865+
assertPoolPinnedStats := func(t *testing.T, p *pool, cursorConns, txnConns uint64) {
866+
t.Helper()
867+
868+
assert.Equal(t, cursorConns, p.pinnedCursorConnections, "expected %d connections to be pinned to cursors, got %d",
869+
cursorConns, p.pinnedCursorConnections)
870+
assert.Equal(t, txnConns, p.pinnedTransactionConnections, "expected %d connections to be pinned to transactions, got %d",
871+
txnConns, p.pinnedTransactionConnections)
872+
}
873+
874+
t.Run("cursors", func(t *testing.T) {
875+
pool, conn := makeOneConnection(t)
876+
err := conn.PinToCursor()
877+
assert.Nil(t, err, "PinToCursor error: %v", err)
878+
assertPoolPinnedStats(t, pool, 1, 0)
879+
880+
err = conn.UnpinFromCursor()
881+
assert.Nil(t, err, "UnpinFromCursor error: %v", err)
882+
883+
err = conn.Close()
884+
assert.Nil(t, err, "Close error: %v", err)
885+
assertPoolPinnedStats(t, pool, 0, 0)
886+
})
887+
t.Run("transactions", func(t *testing.T) {
888+
pool, conn := makeOneConnection(t)
889+
err := conn.PinToTransaction()
890+
assert.Nil(t, err, "PinToTransaction error: %v", err)
891+
assertPoolPinnedStats(t, pool, 0, 1)
892+
893+
err = conn.UnpinFromTransaction()
894+
assert.Nil(t, err, "UnpinFromTransaction error: %v", err)
895+
896+
err = conn.Close()
897+
assert.Nil(t, err, "Close error: %v", err)
898+
assertPoolPinnedStats(t, pool, 0, 0)
899+
})
900+
t.Run("pool is only updated for first reference", func(t *testing.T) {
901+
pool, conn := makeOneConnection(t)
902+
err := conn.PinToTransaction()
903+
assert.Nil(t, err, "PinToTransaction error: %v", err)
904+
assertPoolPinnedStats(t, pool, 0, 1)
905+
906+
err = conn.PinToCursor()
907+
assert.Nil(t, err, "PinToCursor error: %v", err)
908+
assertPoolPinnedStats(t, pool, 0, 1)
909+
910+
err = conn.UnpinFromCursor()
911+
assert.Nil(t, err, "UnpinFromCursor error: %v", err)
912+
assertPoolPinnedStats(t, pool, 0, 1)
913+
914+
err = conn.UnpinFromTransaction()
915+
assert.Nil(t, err, "UnpinFromTransaction error: %v", err)
916+
assertPoolPinnedStats(t, pool, 0, 1)
917+
918+
err = conn.Close()
919+
assert.Nil(t, err, "Close error: %v", err)
920+
assertPoolPinnedStats(t, pool, 0, 0)
921+
})
922+
t.Run("multiple connections from a pool", func(t *testing.T) {
923+
pool, conns := makeMultipleConnections(t, 2)
924+
first, second := conns[0], conns[1]
925+
926+
err := first.PinToTransaction()
927+
assert.Nil(t, err, "PinToTransaction error: %v", err)
928+
err = second.PinToCursor()
929+
assert.Nil(t, err, "PinToCursor error: %v", err)
930+
assertPoolPinnedStats(t, pool, 1, 1)
931+
932+
err = first.UnpinFromTransaction()
933+
assert.Nil(t, err, "UnpinFromTransaction error: %v", err)
934+
err = first.Close()
935+
assert.Nil(t, err, "Close error: %v", err)
936+
assertPoolPinnedStats(t, pool, 1, 0)
937+
938+
err = second.UnpinFromCursor()
939+
assert.Nil(t, err, "UnpinFromCursor error: %v", err)
940+
err = second.Close()
941+
assert.Nil(t, err, "Close error: %v", err)
942+
assertPoolPinnedStats(t, pool, 0, 0)
943+
})
944+
t.Run("close is ignored if connection is pinned", func(t *testing.T) {
945+
pool, conn := makeOneConnection(t)
946+
err := conn.PinToCursor()
947+
assert.Nil(t, err, "PinToCursor error: %v", err)
948+
949+
err = conn.Close()
950+
assert.Nil(t, err, "Close error")
951+
assert.NotNil(t, conn.connection, "expected connection to be pinned but it was released to the pool")
952+
assertPoolPinnedStats(t, pool, 1, 0)
953+
})
954+
t.Run("expire forcefully returns connection to pool", func(t *testing.T) {
955+
pool, conn := makeOneConnection(t)
956+
err := conn.PinToCursor()
957+
assert.Nil(t, err, "PinToCursor error: %v", err)
958+
959+
err = conn.Expire()
960+
assert.Nil(t, err, "Expire error")
961+
assert.Nil(t, conn.connection, "expected connection to be released to the pool but was not")
962+
assertPoolPinnedStats(t, pool, 0, 0)
963+
})
964+
})
837965
})
838966
}
839967

x/mongo/driver/topology/errors.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,23 @@ func (e ServerSelectionError) Unwrap() error {
6262

6363
// WaitQueueTimeoutError represents a timeout when requesting a connection from the pool
6464
type WaitQueueTimeoutError struct {
65-
Wrapped error
65+
Wrapped error
66+
PinnedCursorConnections uint64
67+
PinnedTransactionConnections uint64
68+
maxPoolSize uint64
6669
}
6770

6871
// Error implements the error interface.
6972
func (w WaitQueueTimeoutError) Error() string {
7073
errorMsg := "timed out while checking out a connection from connection pool"
7174
if w.Wrapped != nil {
72-
return fmt.Sprintf("%s: %s", errorMsg, w.Wrapped.Error())
75+
errorMsg = fmt.Sprintf("%s: %s", errorMsg, w.Wrapped.Error())
7376
}
74-
return errorMsg
77+
78+
errorMsg = fmt.Sprintf("%s; maxPoolSize: %d, connections in use by cursors: %d, connections in use by transactions: %d",
79+
errorMsg, w.maxPoolSize, w.PinnedCursorConnections, w.PinnedTransactionConnections)
80+
return fmt.Sprintf("%s, connections in use by other operations: %d", errorMsg,
81+
w.maxPoolSize-(w.PinnedCursorConnections+w.PinnedTransactionConnections))
7582
}
7683

7784
// Unwrap returns the underlying error.

x/mongo/driver/topology/pool.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,14 @@ type pool struct {
6464
generation *poolGenerationMap
6565
monitor *event.PoolMonitor
6666

67-
connected int32 // Must be accessed using the sync/atomic package.
68-
nextid uint64
69-
opened map[uint64]*connection // opened holds all of the currently open connections.
70-
sem *semaphore.Weighted
67+
// Must be accessed using the atomic package.
68+
connected int32
69+
pinnedCursorConnections uint64
70+
pinnedTransactionConnections uint64
71+
72+
nextid uint64
73+
opened map[uint64]*connection // opened holds all of the currently open connections.
74+
sem *semaphore.Weighted
7175
sync.Mutex
7276
}
7377

@@ -313,6 +317,24 @@ func (p *pool) makeNewConnection() (*connection, string, error) {
313317

314318
}
315319

320+
func (p *pool) pinConnectionToCursor() {
321+
atomic.AddUint64(&p.pinnedCursorConnections, 1)
322+
}
323+
324+
func (p *pool) unpinConnectionFromCursor() {
325+
// See https://golang.org/pkg/sync/atomic/#AddUint64 for an explanation of the ^uint64(0) syntax.
326+
atomic.AddUint64(&p.pinnedCursorConnections, ^uint64(0))
327+
}
328+
329+
func (p *pool) pinConnectionToTransaction() {
330+
atomic.AddUint64(&p.pinnedTransactionConnections, 1)
331+
}
332+
333+
func (p *pool) unpinConnectionFromTransaction() {
334+
// See https://golang.org/pkg/sync/atomic/#AddUint64 for an explanation of the ^uint64(0) syntax.
335+
atomic.AddUint64(&p.pinnedTransactionConnections, ^uint64(0))
336+
}
337+
316338
// Checkout returns a connection from the pool
317339
func (p *pool) get(ctx context.Context) (*connection, error) {
318340
if ctx == nil {
@@ -340,7 +362,10 @@ func (p *pool) get(ctx context.Context) (*connection, error) {
340362
})
341363
}
342364
errWaitQueueTimeout := WaitQueueTimeoutError{
343-
Wrapped: ctx.Err(),
365+
Wrapped: ctx.Err(),
366+
PinnedCursorConnections: atomic.LoadUint64(&p.pinnedCursorConnections),
367+
PinnedTransactionConnections: atomic.LoadUint64(&p.pinnedTransactionConnections),
368+
maxPoolSize: p.conns.maxSize,
344369
}
345370
return nil, errWaitQueueTimeout
346371
}

0 commit comments

Comments
 (0)