@@ -17,6 +17,7 @@ function PoolCluster(config) {
17
17
this . _canRetry = typeof config . canRetry === 'undefined' ? true : config . canRetry ;
18
18
this . _defaultSelector = config . defaultSelector || 'RR' ;
19
19
this . _removeNodeErrorCount = config . removeNodeErrorCount || 5 ;
20
+ this . _restoreNodeTimeout = config . restoreNodeTimeout || 0 ;
20
21
21
22
this . _closed = false ;
22
23
this . _findCaches = Object . create ( null ) ;
@@ -45,9 +46,10 @@ PoolCluster.prototype.add = function add(id, config) {
45
46
: new PoolConfig ( id ) ;
46
47
47
48
this . _nodes [ nodeId ] = {
48
- id : nodeId ,
49
- errorCount : 0 ,
50
- pool : new Pool ( { config : poolConfig } )
49
+ id : nodeId ,
50
+ errorCount : 0 ,
51
+ pool : new Pool ( { config : poolConfig } ) ,
52
+ _offlineUntil : 0
51
53
} ;
52
54
53
55
this . _clearFindCaches ( ) ;
@@ -114,17 +116,13 @@ PoolCluster.prototype.of = function(pattern, selector) {
114
116
} ;
115
117
116
118
PoolCluster . prototype . remove = function remove ( pattern ) {
117
- var foundNodeIds = this . _findNodeIds ( pattern ) ;
119
+ var foundNodeIds = this . _findNodeIds ( pattern , true ) ;
118
120
119
121
for ( var i = 0 ; i < foundNodeIds . length ; i ++ ) {
120
122
var node = this . _getNode ( foundNodeIds [ i ] ) ;
121
123
122
124
if ( node ) {
123
- delete this . _nodes [ node . id ] ;
124
-
125
- this . _clearFindCaches ( ) ;
126
-
127
- node . pool . end ( _noop ) ;
125
+ this . _removeNode ( node ) ;
128
126
}
129
127
}
130
128
} ;
@@ -150,56 +148,86 @@ PoolCluster.prototype._clearFindCaches = function _clearFindCaches() {
150
148
this . _findCaches = Object . create ( null ) ;
151
149
} ;
152
150
153
- PoolCluster . prototype . _findNodeIds = function ( pattern ) {
154
- if ( this . _findCaches [ pattern ] !== undefined ) {
155
- return this . _findCaches [ pattern ] ;
151
+ PoolCluster . prototype . _decreaseErrorCount = function _decreaseErrorCount ( node ) {
152
+ var errorCount = node . errorCount ;
153
+
154
+ if ( errorCount > this . _removeNodeErrorCount ) {
155
+ errorCount = this . _removeNodeErrorCount ;
156
+ }
157
+
158
+ if ( errorCount < 1 ) {
159
+ errorCount = 1 ;
156
160
}
157
161
158
- var foundNodeIds ;
159
- var nodeIds = Object . keys ( this . _nodes ) ;
162
+ node . errorCount = errorCount - 1 ;
160
163
161
- if ( pattern === '*' ) {
162
- // all
163
- foundNodeIds = nodeIds ;
164
- } else if ( nodeIds . indexOf ( pattern ) != - 1 ) {
165
- // one
166
- foundNodeIds = [ pattern ] ;
167
- } else if ( pattern [ pattern . length - 1 ] === '*' ) {
168
- // wild-card matching
169
- var keyword = pattern . substring ( pattern . length - 1 , 0 ) ;
164
+ if ( node . _offlineUntil ) {
165
+ node . _offlineUntil = 0 ;
166
+ this . emit ( 'online' , node . id ) ;
167
+ }
168
+ } ;
170
169
171
- foundNodeIds = nodeIds . filter ( function ( id ) {
172
- return id . indexOf ( keyword ) === 0 ;
173
- } ) ;
174
- } else {
175
- foundNodeIds = [ ] ;
170
+ PoolCluster . prototype . _findNodeIds = function _findNodeIds ( pattern , includeOffline ) {
171
+ var currentTime = 0 ;
172
+ var foundNodeIds = this . _findCaches [ pattern ] ;
173
+
174
+ if ( foundNodeIds === undefined ) {
175
+ var nodeIds = Object . keys ( this . _nodes ) ;
176
+ var wildcard = pattern . substr ( - 1 ) === '*' ;
177
+ var keyword = wildcard
178
+ ? pattern . substr ( 0 , pattern . length - 1 )
179
+ : pattern ;
180
+
181
+ if ( wildcard ) {
182
+ foundNodeIds = keyword . length !== 0
183
+ ? nodeIds . filter ( function ( id ) { return id . substr ( 0 , keyword . length ) === keyword ; } )
184
+ : nodeIds ;
185
+ } else {
186
+ var index = nodeIds . indexOf ( keyword ) ;
187
+ foundNodeIds = nodeIds . slice ( index , index + 1 ) ;
188
+ }
189
+
190
+ this . _findCaches [ pattern ] = foundNodeIds ;
191
+ }
192
+
193
+ if ( includeOffline ) {
194
+ return foundNodeIds ;
176
195
}
177
196
178
- this . _findCaches [ pattern ] = foundNodeIds ;
197
+ return foundNodeIds . filter ( function ( nodeId ) {
198
+ var node = this . _getNode ( nodeId ) ;
199
+
200
+ if ( ! node . _offlineUntil ) {
201
+ return true ;
202
+ }
203
+
204
+ if ( ! currentTime ) {
205
+ currentTime = getMonotonicMilliseconds ( ) ;
206
+ }
179
207
180
- return foundNodeIds ;
208
+ return node . _offlineUntil <= currentTime ;
209
+ } , this ) ;
181
210
} ;
182
211
183
- PoolCluster . prototype . _getNode = function ( id ) {
212
+ PoolCluster . prototype . _getNode = function _getNode ( id ) {
184
213
return this . _nodes [ id ] || null ;
185
214
} ;
186
215
187
216
PoolCluster . prototype . _increaseErrorCount = function _increaseErrorCount ( node ) {
188
- if ( ++ node . errorCount >= this . _removeNodeErrorCount ) {
189
- delete this . _nodes [ node . id ] ;
190
-
191
- this . _clearFindCaches ( ) ;
217
+ var errorCount = ++ node . errorCount ;
192
218
193
- node . pool . end ( _noop ) ;
194
-
195
- this . emit ( 'remove' , node . id ) ;
219
+ if ( this . _removeNodeErrorCount > errorCount ) {
220
+ return ;
196
221
}
197
- } ;
198
222
199
- PoolCluster . prototype . _decreaseErrorCount = function ( node ) {
200
- if ( node . errorCount > 0 ) {
201
- -- node . errorCount ;
223
+ if ( this . _restoreNodeTimeout > 0 ) {
224
+ node . _offlineUntil = getMonotonicMilliseconds ( ) + this . _restoreNodeTimeout ;
225
+ this . emit ( 'offline' , node . id ) ;
226
+ return ;
202
227
}
228
+
229
+ this . _removeNode ( node ) ;
230
+ this . emit ( 'remove' , node . id ) ;
203
231
} ;
204
232
205
233
PoolCluster . prototype . _getConnection = function ( node , cb ) {
@@ -220,6 +248,27 @@ PoolCluster.prototype._getConnection = function(node, cb) {
220
248
} ) ;
221
249
} ;
222
250
251
+ PoolCluster . prototype . _removeNode = function _removeNode ( node ) {
252
+ delete this . _nodes [ node . id ] ;
253
+
254
+ this . _clearFindCaches ( ) ;
255
+
256
+ node . pool . end ( _noop ) ;
257
+ } ;
258
+
259
+ function getMonotonicMilliseconds ( ) {
260
+ var ms ;
261
+
262
+ if ( typeof process . hrtime === 'function' ) {
263
+ ms = process . hrtime ( ) ;
264
+ ms = ms [ 0 ] * 1e3 + ms [ 1 ] * 1e-6 ;
265
+ } else {
266
+ ms = process . uptime ( ) * 1000 ;
267
+ }
268
+
269
+ return Math . floor ( ms ) ;
270
+ }
271
+
223
272
function _cb ( err ) {
224
273
if ( err ) {
225
274
throw err ;
0 commit comments