Skip to content

Commit

Permalink
GODRIVER-2910 Add durations to connection pool events. (#1590)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdale authored Apr 9, 2024
1 parent b693b75 commit 94dfdff
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 26 deletions.
1 change: 1 addition & 0 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type PoolEvent struct {
Address string `json:"address"`
ConnectionID uint64 `json:"connectionId"`
PoolOptions *MonitorPoolOptions `json:"options"`
Duration time.Duration `json:"duration"`
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
Expand All @@ -162,6 +169,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
Expand Down Expand Up @@ -222,6 +236,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
},
Expand Down Expand Up @@ -444,7 +465,7 @@
{
"level": "debug",
"component": "connection",
"unordered": true,
"unordered": true,
"data": {
"message": "Connection closed",
"driverConnectionId": {
Expand All @@ -471,7 +492,7 @@
{
"level": "debug",
"component": "connection",
"unordered": true,
"unordered": true,
"data": {
"message": "Connection checkout failed",
"serverHost": {
Expand All @@ -486,6 +507,13 @@
"reason": "An error occurred while trying to establish a new connection",
"error": {
"$$exists": true
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

- level: debug
component: connection
Expand All @@ -74,6 +75,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

- level: debug
component: connection
Expand All @@ -98,6 +100,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

- level: debug
component: connection
Expand Down Expand Up @@ -219,4 +222,5 @@ tests:
serverPort: { $$type: [int, long] }
reason: "An error occurred while trying to establish a new connection"
error: { $$exists: true }
durationMS: { $$type: [double, int, long] }
unordered: true
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"description": "connection-pool-logging",
"description": "connection-pool-options",
"schemaVersion": "1.13",
"runOnRequirements": [
{
Expand Down Expand Up @@ -128,6 +128,13 @@
"int",
"long"
]
},
"durationMS": {
"$$type": [
"double",
"int",
"long"
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description: "connection-pool-logging"
description: "connection-pool-options"

schemaVersion: "1.13"

Expand Down Expand Up @@ -71,6 +71,7 @@ tests:
driverConnectionId: { $$type: [int, long] }
serverHost: { $$type: string }
serverPort: { $$type: [int, long] }
durationMS: { $$type: [double, int, long] }

# Drivers who have not done DRIVERS-1943 will need to skip this test.
- description: "maxConnecting should be included in connection pool created message when specified"
Expand Down
70 changes: 48 additions & 22 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
})
}

start := time.Now()
// Check the pool state while holding a stateMu read lock. If the pool state is not "ready",
// return an error. Do all of this while holding the stateMu read lock to prevent a state change between
// checking the state and entering the wait queue. Not holding the stateMu read lock here may
Expand All @@ -477,8 +478,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
case poolClosed:
p.stateMu.RUnlock()

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed,
}

Expand All @@ -487,18 +490,21 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonPoolClosed,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonPoolClosed,
})
}
return nil, ErrPoolClosed
case poolPaused:
err := poolClearedError{err: p.lastClearErr, address: p.address}
p.stateMu.RUnlock()

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedError,
}

Expand All @@ -507,10 +513,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonConnectionErrored,
Error: err,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonConnectionErrored,
Error: err,
})
}
return nil, err
Expand Down Expand Up @@ -539,9 +546,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
// or an error, so unlock the stateMu lock here.
p.stateMu.RUnlock()

duration := time.Since(start)
if w.err != nil {
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedError,
}

Expand All @@ -550,18 +559,21 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonConnectionErrored,
Error: w.err,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonConnectionErrored,
Error: w.err,
})
}
return nil, w.err
}

duration = time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, w.conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
Expand All @@ -572,6 +584,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
Type: event.GetSucceeded,
Address: p.address.String(),
ConnectionID: w.conn.driverConnectionID,
Duration: duration,
})
}

Expand All @@ -584,12 +597,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
p.stateMu.RUnlock()

// Wait for either the wantConn to be ready or for the Context to time out.
start := time.Now()
waitQueueStart := time.Now()
select {
case <-w.ready:
if w.err != nil {
duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedError,
logger.KeyError, w.err.Error(),
}
Expand All @@ -599,19 +614,22 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonConnectionErrored,
Error: w.err,
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonConnectionErrored,
Error: w.err,
})
}

return nil, w.err
}

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, w.conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
Expand All @@ -622,14 +640,17 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
Type: event.GetSucceeded,
Address: p.address.String(),
ConnectionID: w.conn.driverConnectionID,
Duration: duration,
})
}
return w.conn, nil
case <-ctx.Done():
duration := time.Since(start)
waitQueueDuration := time.Since(waitQueueStart)

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDurationMS, duration.Milliseconds(),
logger.KeyReason, logger.ReasonConnCheckoutFailedTimout,
}

Expand All @@ -638,10 +659,11 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {

if p.monitor != nil {
p.monitor.Event(&event.PoolEvent{
Type: event.GetFailed,
Address: p.address.String(),
Reason: event.ReasonTimedOut,
Error: ctx.Err(),
Type: event.GetFailed,
Address: p.address.String(),
Duration: duration,
Reason: event.ReasonTimedOut,
Error: ctx.Err(),
})
}

Expand All @@ -650,7 +672,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
maxPoolSize: p.maxSize,
totalConnections: p.totalConnectionCount(),
availableConnections: p.availableConnectionCount(),
waitDuration: duration,
waitDuration: waitQueueDuration,
}
if p.loadBalanced {
err.pinnedConnections = &pinnedConnections{
Expand Down Expand Up @@ -1085,6 +1107,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
})
}

start := time.Now()
// Pass the createConnections context to connect to allow pool close to cancel connection
// establishment so shutdown doesn't block indefinitely if connectTimeout=0.
err := conn.connect(ctx)
Expand All @@ -1111,9 +1134,11 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
continue
}

duration := time.Since(start)
if mustLogPoolMessage(p) {
keysAndValues := logger.KeyValues{
logger.KeyDriverConnectionID, conn.driverConnectionID,
logger.KeyDurationMS, duration.Milliseconds(),
}

logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
Expand All @@ -1124,6 +1149,7 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
Type: event.ConnectionReady,
Address: p.address.String(),
ConnectionID: conn.driverConnectionID,
Duration: duration,
})
}

Expand Down
Loading

0 comments on commit 94dfdff

Please sign in to comment.