1
1
'use strict' ;
2
+ const Denque = require ( 'denque' ) ;
2
3
const EventEmitter = require ( 'events' ) ;
3
4
const ServerDescription = require ( './server_description' ) . ServerDescription ;
4
5
const ServerType = require ( './common' ) . ServerType ;
@@ -18,6 +19,7 @@ const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
18
19
const maxWireVersion = require ( '../utils' ) . maxWireVersion ;
19
20
const ClientSession = require ( '../sessions' ) . ClientSession ;
20
21
const MongoError = require ( '../error' ) . MongoError ;
22
+ const MongoServerSelectionError = require ( '../error' ) . MongoServerSelectionError ;
21
23
const resolveClusterTime = require ( '../topologies/shared' ) . resolveClusterTime ;
22
24
const SrvPoller = require ( './srv_polling' ) . SrvPoller ;
23
25
const getMMAPError = require ( '../topologies/shared' ) . getMMAPError ;
@@ -35,7 +37,7 @@ const clearAndRemoveTimerFrom = common.clearAndRemoveTimerFrom;
35
37
const serverSelection = require ( './server_selection' ) ;
36
38
const readPreferenceServerSelector = serverSelection . readPreferenceServerSelector ;
37
39
const writableServerSelector = serverSelection . writableServerSelector ;
38
- const selectServers = serverSelection . selectServers ;
40
+ // const selectServers = serverSelection.selectServers;
39
41
40
42
// Global state
41
43
let globalTopologyCounter = 0 ;
@@ -74,6 +76,9 @@ const DEPRECATED_OPTIONS = new Set([
74
76
'bufferMaxEntries'
75
77
] ) ;
76
78
79
+ const kCancelled = Symbol ( 'cancelled' ) ;
80
+ const kWaitQueue = Symbol ( 'waitQueue' ) ;
81
+
77
82
/**
78
83
* A container of server instances representing a connection to a MongoDB topology.
79
84
*
@@ -140,6 +145,7 @@ class Topology extends EventEmitter {
140
145
return result ;
141
146
} , new Map ( ) ) ;
142
147
148
+ this [ kWaitQueue ] = new Denque ( ) ;
143
149
this . s = {
144
150
// the id of this topology
145
151
id : topologyId ,
@@ -195,7 +201,6 @@ class Topology extends EventEmitter {
195
201
clusterTime : null ,
196
202
197
203
// timer management
198
- iterationTimers : new Set ( ) ,
199
204
connectionTimers : new Set ( )
200
205
} ;
201
206
@@ -335,8 +340,7 @@ class Topology extends EventEmitter {
335
340
return ;
336
341
}
337
342
338
- // clear all existing monitor timers
339
- drainTimerQueue ( this . s . iterationTimers ) ;
343
+ drainWaitQueue ( this [ kWaitQueue ] , new MongoError ( 'Topology closed' ) ) ;
340
344
drainTimerQueue ( this . s . connectionTimers ) ;
341
345
342
346
if ( this . s . srvPoller ) {
@@ -416,26 +420,43 @@ class Topology extends EventEmitter {
416
420
const transaction = session && session . transaction ;
417
421
418
422
if ( isSharded && transaction && transaction . server ) {
419
- callback ( null , transaction . server ) ;
423
+ callback ( undefined , transaction . server ) ;
420
424
return ;
421
425
}
422
426
423
- selectServers (
424
- this ,
425
- selector ,
426
- options . serverSelectionTimeoutMS ,
427
- process . hrtime ( ) ,
428
- ( err , servers ) => {
429
- if ( err ) return callback ( err ) ;
430
-
431
- const selectedServer = randomSelection ( servers ) ;
432
- if ( isSharded && transaction && transaction . isActive ) {
433
- transaction . pinServer ( selectedServer ) ;
434
- }
427
+ // support server selection by options with readPreference
428
+ let serverSelector = selector ;
429
+ if ( typeof selector === 'object' ) {
430
+ const readPreference = selector . readPreference
431
+ ? selector . readPreference
432
+ : ReadPreference . primary ;
435
433
436
- callback ( null , selectedServer ) ;
437
- }
438
- ) ;
434
+ serverSelector = readPreferenceServerSelector ( readPreference ) ;
435
+ }
436
+
437
+ const waitQueueMember = {
438
+ serverSelector,
439
+ transaction,
440
+ callback
441
+ } ;
442
+
443
+ const serverSelectionTimeoutMS = options . serverSelectionTimeoutMS ;
444
+ if ( serverSelectionTimeoutMS ) {
445
+ waitQueueMember . timer = setTimeout ( ( ) => {
446
+ waitQueueMember [ kCancelled ] = true ;
447
+ waitQueueMember . timer = undefined ;
448
+ const timeoutError = new MongoServerSelectionError (
449
+ `Server selection timed out after ${ serverSelectionTimeoutMS } ms` ,
450
+ this . description
451
+ ) ;
452
+
453
+ waitQueueMember . callback ( timeoutError ) ;
454
+ } , serverSelectionTimeoutMS ) ;
455
+ }
456
+
457
+ // place the member at the front of the wait queue
458
+ this [ kWaitQueue ] . unshift ( waitQueueMember ) ;
459
+ processWaitQueue ( this ) ;
439
460
}
440
461
441
462
// Sessions related methods
@@ -545,6 +566,11 @@ class Topology extends EventEmitter {
545
566
// update server list from updated descriptions
546
567
updateServers ( this , serverDescription ) ;
547
568
569
+ // attempt to resolve any outstanding server selection attempts
570
+ if ( this [ kWaitQueue ] . length > 0 ) {
571
+ processWaitQueue ( this ) ;
572
+ }
573
+
548
574
this . emit (
549
575
'topologyDescriptionChanged' ,
550
576
new events . TopologyDescriptionChangedEvent (
@@ -1012,6 +1038,64 @@ function srvPollingHandler(topology) {
1012
1038
} ;
1013
1039
}
1014
1040
1041
+ function drainWaitQueue ( queue , err ) {
1042
+ while ( queue . length ) {
1043
+ const waitQueueMember = queue . pop ( ) ;
1044
+ clearTimeout ( waitQueueMember . timer ) ;
1045
+ if ( ! waitQueueMember [ kCancelled ] ) {
1046
+ waitQueueMember . callback ( err ) ;
1047
+ }
1048
+ }
1049
+ }
1050
+
1051
+ function processWaitQueue ( topology ) {
1052
+ if ( topology . s . state === STATE_CLOSED ) {
1053
+ drainWaitQueue ( topology [ kWaitQueue ] , new MongoError ( 'Topology is closed, please connect' ) ) ;
1054
+ return ;
1055
+ }
1056
+
1057
+ const isSharded = topology . description . type === TopologyType . Sharded ;
1058
+ const serverDescriptions = Array . from ( topology . description . servers . values ( ) ) ;
1059
+ for ( let i = 0 ; i < topology [ kWaitQueue ] . length ; ++ i ) {
1060
+ const waitQueueMember = topology [ kWaitQueue ] . shift ( ) ;
1061
+ if ( waitQueueMember [ kCancelled ] ) {
1062
+ continue ;
1063
+ }
1064
+
1065
+ let selectedDescriptions ;
1066
+ try {
1067
+ const serverSelector = waitQueueMember . serverSelector ;
1068
+ selectedDescriptions = serverSelector
1069
+ ? serverSelector ( topology . description , serverDescriptions )
1070
+ : serverDescriptions ;
1071
+ } catch ( e ) {
1072
+ clearTimeout ( waitQueueMember . timer ) ;
1073
+ waitQueueMember . callback ( e ) ;
1074
+ break ;
1075
+ }
1076
+
1077
+ if ( selectedDescriptions . length === 0 ) {
1078
+ topology [ kWaitQueue ] . push ( waitQueueMember ) ;
1079
+ break ;
1080
+ }
1081
+
1082
+ const selectedServerDescription = randomSelection ( selectedDescriptions ) ;
1083
+ const selectedServer = topology . s . servers . get ( selectedServerDescription . address ) ;
1084
+ const transaction = waitQueueMember . transaction ;
1085
+ if ( isSharded && transaction && transaction . isActive ) {
1086
+ transaction . pinServer ( selectedServer ) ;
1087
+ }
1088
+
1089
+ clearTimeout ( waitQueueMember . timer ) ;
1090
+ waitQueueMember . callback ( undefined , selectedServer ) ;
1091
+ }
1092
+
1093
+ if ( topology [ kWaitQueue ] . length > 0 ) {
1094
+ // ensure all server monitors attempt monitoring soon
1095
+ topology . s . servers . forEach ( server => process . nextTick ( ( ) => server . requestCheck ( ) ) ) ;
1096
+ }
1097
+ }
1098
+
1015
1099
/**
1016
1100
* A server opening SDAM monitoring event
1017
1101
*
0 commit comments