@@ -168,18 +168,19 @@ public InternalConnection get(final OperationContext operationContext) {
168
168
169
169
@ Override
170
170
public InternalConnection get (final OperationContext operationContext , final long timeoutValue , final TimeUnit timeUnit ) {
171
- connectionPoolListener .connectionCheckOutStarted (new ConnectionCheckOutStartedEvent (serverId ));
171
+ connectionPoolListener .connectionCheckOutStarted (new ConnectionCheckOutStartedEvent (serverId , operationContext . getId () ));
172
172
Timeout timeout = Timeout .startNow (timeoutValue , timeUnit );
173
173
try {
174
174
stateAndGeneration .throwIfClosedOrPaused ();
175
175
PooledConnection connection = getPooledConnection (timeout );
176
176
if (!connection .opened ()) {
177
177
connection = openConcurrencyLimiter .openOrGetAvailable (connection , timeout );
178
178
}
179
- connectionPoolListener .connectionCheckedOut (new ConnectionCheckedOutEvent (getId (connection )));
179
+ connection .checkedOutForOperation (operationContext );
180
+ connectionPoolListener .connectionCheckedOut (new ConnectionCheckedOutEvent (getId (connection ), operationContext .getId ()));
180
181
return connection ;
181
182
} catch (Exception e ) {
182
- throw (RuntimeException ) checkOutFailed (e );
183
+ throw (RuntimeException ) checkOutFailed (e , operationContext );
183
184
}
184
185
}
185
186
@@ -188,15 +189,16 @@ public void getAsync(final OperationContext operationContext, final SingleResult
188
189
if (LOGGER .isTraceEnabled ()) {
189
190
LOGGER .trace (format ("Asynchronously getting a connection from the pool for server %s" , serverId ));
190
191
}
191
- connectionPoolListener .connectionCheckOutStarted (new ConnectionCheckOutStartedEvent (serverId ));
192
+ connectionPoolListener .connectionCheckOutStarted (new ConnectionCheckOutStartedEvent (serverId , operationContext . getId () ));
192
193
Timeout timeout = Timeout .startNow (settings .getMaxWaitTime (NANOSECONDS ));
193
- SingleResultCallback <InternalConnection > eventSendingCallback = (result , failure ) -> {
194
+ SingleResultCallback <PooledConnection > eventSendingCallback = (connection , failure ) -> {
194
195
SingleResultCallback <InternalConnection > errHandlingCallback = errorHandlingCallback (callback , LOGGER );
195
196
if (failure == null ) {
196
- connectionPoolListener .connectionCheckedOut (new ConnectionCheckedOutEvent (getId (result )));
197
- errHandlingCallback .onResult (result , null );
197
+ connection .checkedOutForOperation (operationContext );
198
+ connectionPoolListener .connectionCheckedOut (new ConnectionCheckedOutEvent (getId (connection ), operationContext .getId ()));
199
+ errHandlingCallback .onResult (connection , null );
198
200
} else {
199
- errHandlingCallback .onResult (null , checkOutFailed (failure ));
201
+ errHandlingCallback .onResult (null , checkOutFailed (failure , operationContext ));
200
202
}
201
203
};
202
204
try {
@@ -238,20 +240,22 @@ public void getAsync(final OperationContext operationContext, final SingleResult
238
240
* and returns {@code t} if it is not {@link MongoOpenConnectionInternalException},
239
241
* or returns {@code t.}{@linkplain MongoOpenConnectionInternalException#getCause() getCause()} otherwise.
240
242
*/
241
- private Throwable checkOutFailed (final Throwable t ) {
243
+ private Throwable checkOutFailed (final Throwable t , final OperationContext operationContext ) {
242
244
Throwable result = t ;
245
+ Reason reason ;
243
246
if (t instanceof MongoTimeoutException ) {
244
- connectionPoolListener . connectionCheckOutFailed ( new ConnectionCheckOutFailedEvent ( serverId , Reason .TIMEOUT )) ;
247
+ reason = Reason .TIMEOUT ;
245
248
} else if (t instanceof MongoOpenConnectionInternalException ) {
246
- connectionPoolListener . connectionCheckOutFailed ( new ConnectionCheckOutFailedEvent ( serverId , Reason .CONNECTION_ERROR )) ;
249
+ reason = Reason .CONNECTION_ERROR ;
247
250
result = t .getCause ();
248
251
} else if (t instanceof MongoConnectionPoolClearedException ) {
249
- connectionPoolListener . connectionCheckOutFailed ( new ConnectionCheckOutFailedEvent ( serverId , Reason .CONNECTION_ERROR )) ;
252
+ reason = Reason .CONNECTION_ERROR ;
250
253
} else if (ConcurrentPool .isPoolClosedException (t )) {
251
- connectionPoolListener . connectionCheckOutFailed ( new ConnectionCheckOutFailedEvent ( serverId , Reason .POOL_CLOSED )) ;
254
+ reason = Reason .POOL_CLOSED ;
252
255
} else {
253
- connectionPoolListener . connectionCheckOutFailed ( new ConnectionCheckOutFailedEvent ( serverId , Reason .UNKNOWN )) ;
256
+ reason = Reason .UNKNOWN ;
254
257
}
258
+ connectionPoolListener .connectionCheckOutFailed (new ConnectionCheckOutFailedEvent (serverId , operationContext .getId (), reason ));
255
259
return result ;
256
260
}
257
261
@@ -516,6 +520,7 @@ private class PooledConnection implements InternalConnection {
516
520
private final UsageTrackingInternalConnection wrapped ;
517
521
private final AtomicBoolean isClosed = new AtomicBoolean ();
518
522
private Connection .PinningMode pinningMode ;
523
+ private OperationContext operationContext ;
519
524
520
525
PooledConnection (final UsageTrackingInternalConnection wrapped ) {
521
526
this .wrapped = notNull ("wrapped" , wrapped );
@@ -526,6 +531,13 @@ public int getGeneration() {
526
531
return wrapped .getGeneration ();
527
532
}
528
533
534
+ /**
535
+ * Associates this with the operation context and establishes the checked out start time
536
+ */
537
+ public void checkedOutForOperation (final OperationContext operationContext ) {
538
+ this .operationContext = operationContext ;
539
+ }
540
+
529
541
@ Override
530
542
public void open () {
531
543
assertFalse (isClosed .get ());
@@ -559,7 +571,7 @@ public void close() {
559
571
// All but the first call is a no-op
560
572
if (!isClosed .getAndSet (true )) {
561
573
unmarkAsPinned ();
562
- connectionPoolListener .connectionCheckedIn (new ConnectionCheckedInEvent (getId (wrapped )));
574
+ connectionPoolListener .connectionCheckedIn (new ConnectionCheckedInEvent (getId (wrapped ), operationContext . getId () ));
563
575
if (LOGGER .isTraceEnabled ()) {
564
576
LOGGER .trace (format ("Checked in connection [%s] to server %s" , getId (wrapped ), serverId .getAddress ()));
565
577
}
@@ -731,7 +743,7 @@ public ServerDescription getInitialServerDescription() {
731
743
/**
732
744
* This internal exception is used to express an exceptional situation encountered when opening a connection.
733
745
* It exists because it allows consolidating the code that sends events for exceptional situations in a
734
- * {@linkplain #checkOutFailed(Throwable) single place}, it must not be observable by an external code.
746
+ * {@linkplain #checkOutFailed(Throwable, OperationContext ) single place}, it must not be observable by an external code.
735
747
*/
736
748
private static final class MongoOpenConnectionInternalException extends RuntimeException {
737
749
private static final long serialVersionUID = 1 ;
@@ -919,7 +931,7 @@ private PooledConnection openWithConcurrencyLimit(final PooledConnection connect
919
931
* </ul>
920
932
*/
921
933
void openAsyncWithConcurrencyLimit (
922
- final PooledConnection connection , final Timeout timeout , final SingleResultCallback <InternalConnection > callback ) {
934
+ final PooledConnection connection , final Timeout timeout , final SingleResultCallback <PooledConnection > callback ) {
923
935
PooledConnection availableConnection ;
924
936
try {//phase one
925
937
availableConnection = acquirePermitOrGetAvailableOpenedConnection (true , timeout );
0 commit comments