@@ -48,6 +48,11 @@ export enum ConnectivityState {
48
48
SHUTDOWN ,
49
49
}
50
50
51
+ /**
52
+ * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
53
+ */
54
+ const MAX_TIMEOUT_TIME = 2147483647 ;
55
+
51
56
let nextCallNumber = 0 ;
52
57
53
58
function getNewCallNumber ( ) : number {
@@ -137,6 +142,14 @@ export class ChannelImplementation implements Channel {
137
142
private defaultAuthority : string ;
138
143
private filterStackFactory : FilterStackFactory ;
139
144
private target : GrpcUri ;
145
+ /**
146
+ * This timer does not do anything on its own. Its purpose is to hold the
147
+ * event loop open while there are any pending calls for the channel that
148
+ * have not yet been assigned to specific subchannels. In other words,
149
+ * the invariant is that callRefTimer is reffed if and only if pickQueue
150
+ * is non-empty.
151
+ */
152
+ private callRefTimer : NodeJS . Timer ;
140
153
constructor (
141
154
target : string ,
142
155
private readonly credentials : ChannelCredentials ,
@@ -177,6 +190,10 @@ export class ChannelImplementation implements Channel {
177
190
`Could not find a default scheme for target name "${ target } "`
178
191
) ;
179
192
}
193
+
194
+ this . callRefTimer = setInterval ( ( ) => { } , MAX_TIMEOUT_TIME ) ;
195
+ this . callRefTimer . unref ?.( ) ;
196
+
180
197
if ( this . options [ 'grpc.default_authority' ] ) {
181
198
this . defaultAuthority = this . options [ 'grpc.default_authority' ] as string ;
182
199
} else {
@@ -206,6 +223,7 @@ export class ChannelImplementation implements Channel {
206
223
updateState : ( connectivityState : ConnectivityState , picker : Picker ) => {
207
224
this . currentPicker = picker ;
208
225
const queueCopy = this . pickQueue . slice ( ) ;
226
+ this . callRefTimer . unref ?.( ) ;
209
227
this . pickQueue = [ ] ;
210
228
for ( const { callStream, callMetadata } of queueCopy ) {
211
229
this . tryPick ( callStream , callMetadata ) ;
@@ -232,6 +250,11 @@ export class ChannelImplementation implements Channel {
232
250
] ) ;
233
251
}
234
252
253
+ private pushPick ( callStream : Http2CallStream , callMetadata : Metadata ) {
254
+ this . callRefTimer . ref ?.( ) ;
255
+ this . pickQueue . push ( { callStream, callMetadata } ) ;
256
+ }
257
+
235
258
/**
236
259
* Check the picker output for the given call and corresponding metadata,
237
260
* and take any relevant actions. Should not be called while iterating
@@ -276,7 +299,7 @@ export class ChannelImplementation implements Channel {
276
299
' has state ' +
277
300
ConnectivityState [ pickResult . subchannel ! . getConnectivityState ( ) ]
278
301
) ;
279
- this . pickQueue . push ( { callStream, callMetadata } ) ;
302
+ this . pushPick ( callStream , callMetadata ) ;
280
303
break ;
281
304
}
282
305
/* We need to clone the callMetadata here because the transparent
@@ -367,11 +390,11 @@ export class ChannelImplementation implements Channel {
367
390
}
368
391
break ;
369
392
case PickResultType . QUEUE :
370
- this . pickQueue . push ( { callStream, callMetadata } ) ;
393
+ this . pushPick ( callStream , callMetadata ) ;
371
394
break ;
372
395
case PickResultType . TRANSIENT_FAILURE :
373
396
if ( callMetadata . getOptions ( ) . waitForReady ) {
374
- this . pickQueue . push ( { callStream, callMetadata } ) ;
397
+ this . pushPick ( callStream , callMetadata ) ;
375
398
} else {
376
399
callStream . cancelWithStatus (
377
400
pickResult . status ! . code ,
@@ -433,6 +456,7 @@ export class ChannelImplementation implements Channel {
433
456
close ( ) {
434
457
this . resolvingLoadBalancer . destroy ( ) ;
435
458
this . updateState ( ConnectivityState . SHUTDOWN ) ;
459
+ clearInterval ( this . callRefTimer ) ;
436
460
437
461
this . subchannelPool . unrefUnusedSubchannels ( ) ;
438
462
}
0 commit comments