1
+ /*
2
+ * Copyright 2020 gRPC authors.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ *
16
+ */
17
+
18
+ import { LoadBalancer , ChannelControlHelper , getFirstUsableConfig , registerLoadBalancerType } from "./load-balancer" ;
19
+ import { SubchannelAddress } from "./subchannel" ;
20
+ import { LoadBalancingConfig , WeightedTarget , isWeightedTargetLoadBalancingConfig } from "./load-balancing-config" ;
21
+ import { Picker , PickResult , PickArgs , QueuePicker , UnavailablePicker } from "./picker" ;
22
+ import { ConnectivityState } from "./channel" ;
23
+ import { ChildLoadBalancerHandler } from "./load-balancer-child-handler" ;
24
+ import { Status } from "./constants" ;
25
+ import { Metadata } from "./metadata" ;
26
+ import { isLocalitySubchannelAddress , LocalitySubchannelAddress } from "./load-balancer-priority" ;
27
+
28
+ const TYPE_NAME = 'weighted_target' ;
29
+
30
+ const DEFAULT_RETENTION_INTERVAL_MS = 15 * 60 * 1000 ;
31
+
32
+ /**
33
+ * Represents a picker and a subinterval of a larger interval used for randomly
34
+ * selecting an element of a list of these objects.
35
+ */
36
+ interface WeightedPicker {
37
+ picker : Picker ;
38
+ /**
39
+ * The exclusive end of the interval associated with this element. The start
40
+ * of the interval is implicitly the rangeEnd of the previous element in the
41
+ * list, or 0 for the first element in the list.
42
+ */
43
+ rangeEnd : number ;
44
+ }
45
+
46
+ class WeightedTargetPicker implements Picker {
47
+ private rangeTotal : number ;
48
+ constructor ( private readonly pickerList : WeightedPicker [ ] ) {
49
+ this . rangeTotal = pickerList [ pickerList . length - 1 ] . rangeEnd ;
50
+ }
51
+ pick ( pickArgs : PickArgs ) : PickResult {
52
+ // num | 0 is equivalent to floor(num)
53
+ const selection = ( Math . random ( ) * this . rangeTotal ) | 0 ;
54
+
55
+ /* Binary search for the element of the list such that
56
+ * pickerList[index - 1].rangeEnd <= selection < pickerList[index].rangeEnd
57
+ */
58
+ let mid = 0 ;
59
+ let startIndex = 0 ;
60
+ let endIndex = this . pickerList . length - 1 ;
61
+ let index = 0 ;
62
+ while ( endIndex > startIndex ) {
63
+ mid = ( ( startIndex + endIndex ) / 2 ) | 0 ;
64
+ if ( this . pickerList [ mid ] . rangeEnd > selection ) {
65
+ endIndex = mid ;
66
+ } else if ( this . pickerList [ mid ] . rangeEnd < selection ) {
67
+ startIndex = mid + 1 ;
68
+ } else {
69
+ // + 1 here because the range is exclusive at the top end
70
+ index = mid + 1 ;
71
+ break ;
72
+ }
73
+ }
74
+ if ( index === 0 ) {
75
+ index = startIndex ;
76
+ }
77
+
78
+ return this . pickerList [ index ] . picker . pick ( pickArgs ) ;
79
+ }
80
+ }
81
+
82
+ interface WeightedChild {
83
+ updateAddressList ( addressList : SubchannelAddress [ ] , lbConfig : WeightedTarget , attributes : { [ key : string ] : unknown ; } ) : void ;
84
+ exitIdle ( ) : void ;
85
+ resetBackoff ( ) : void ;
86
+ destroy ( ) : void ;
87
+ deactivate ( ) : void ;
88
+ maybeReactivate ( ) : void ;
89
+ getConnectivityState ( ) : ConnectivityState ;
90
+ getPicker ( ) : Picker ;
91
+ getWeight ( ) : number ;
92
+ }
93
+
94
+ export class WeightedTargetLoadBalancer implements LoadBalancer {
95
+ private WeightedChildImpl = class implements WeightedChild {
96
+ private connectivityState : ConnectivityState = ConnectivityState . IDLE ;
97
+ private picker : Picker ;
98
+ private childBalancer : ChildLoadBalancerHandler ;
99
+ private deactivationTimer : NodeJS . Timer | null = null ;
100
+ private weight : number = 0 ;
101
+
102
+ constructor ( private parent : WeightedTargetLoadBalancer , private name : string ) {
103
+ this . childBalancer = new ChildLoadBalancerHandler ( {
104
+ createSubchannel : ( subchannelAddress , subchannelOptions ) => {
105
+ return this . parent . channelControlHelper . createSubchannel ( subchannelAddress , subchannelOptions ) ;
106
+ } ,
107
+ updateState : ( connectivityState , picker ) => {
108
+ this . updateState ( connectivityState , picker ) ;
109
+ } ,
110
+ requestReresolution : ( ) => {
111
+ this . parent . channelControlHelper . requestReresolution ( ) ;
112
+ }
113
+ } ) ;
114
+
115
+ this . picker = new QueuePicker ( this . childBalancer ) ;
116
+ }
117
+
118
+ private updateState ( connectivityState : ConnectivityState , picker : Picker ) {
119
+ this . connectivityState = connectivityState ;
120
+ this . picker = picker ;
121
+ this . parent . updateState ( ) ;
122
+ }
123
+
124
+ updateAddressList ( addressList : SubchannelAddress [ ] , lbConfig : WeightedTarget , attributes : { [ key : string ] : unknown ; } ) : void {
125
+ this . weight = lbConfig . weight ;
126
+ const childConfig = getFirstUsableConfig ( lbConfig . child_policy ) ;
127
+ if ( childConfig !== null ) {
128
+ this . childBalancer . updateAddressList ( addressList , childConfig , attributes ) ;
129
+ }
130
+ }
131
+ exitIdle ( ) : void {
132
+ this . childBalancer . exitIdle ( ) ;
133
+ }
134
+ resetBackoff ( ) : void {
135
+ this . childBalancer . resetBackoff ( ) ;
136
+ }
137
+ destroy ( ) : void {
138
+ this . childBalancer . destroy ( ) ;
139
+ if ( this . deactivationTimer !== null ) {
140
+ clearTimeout ( this . deactivationTimer ) ;
141
+ }
142
+ }
143
+ deactivate ( ) : void {
144
+ if ( this . deactivationTimer === null ) {
145
+ this . deactivationTimer = setTimeout ( ( ) => {
146
+ this . parent . targets . delete ( this . name ) ;
147
+ this . deactivationTimer = null ;
148
+ } , DEFAULT_RETENTION_INTERVAL_MS ) ;
149
+ }
150
+ }
151
+ maybeReactivate ( ) : void {
152
+ if ( this . deactivationTimer !== null ) {
153
+ clearTimeout ( this . deactivationTimer ) ;
154
+ this . deactivationTimer = null ;
155
+ }
156
+ }
157
+ getConnectivityState ( ) : ConnectivityState {
158
+ return this . connectivityState ;
159
+ }
160
+ getPicker ( ) : Picker {
161
+ return this . picker ;
162
+ }
163
+ getWeight ( ) : number {
164
+ return this . weight ;
165
+ }
166
+ }
167
+ // end of WeightedChildImpl
168
+
169
+ /**
170
+ * Map of target names to target children. Includes current targets and
171
+ * previous targets with deactivation timers that have not yet triggered.
172
+ */
173
+ private targets : Map < string , WeightedChild > = new Map < string , WeightedChild > ( ) ;
174
+ /**
175
+ * List of current target names.
176
+ */
177
+ private targetList : string [ ] = [ ] ;
178
+
179
+ constructor ( private channelControlHelper : ChannelControlHelper ) { }
180
+
181
+ private updateState ( ) {
182
+ const pickerList : WeightedPicker [ ] = [ ] ;
183
+ let end = 0 ;
184
+
185
+ let connectingCount = 0 ;
186
+ let idleCount = 0 ;
187
+ let transientFailureCount = 0 ;
188
+ for ( const targetName of this . targetList ) {
189
+ const target = this . targets . get ( targetName ) ;
190
+ if ( target === undefined ) {
191
+ continue ;
192
+ }
193
+ switch ( target . getConnectivityState ( ) ) {
194
+ case ConnectivityState . READY :
195
+ end += target . getWeight ( ) ;
196
+ pickerList . push ( {
197
+ picker : target . getPicker ( ) ,
198
+ rangeEnd : end
199
+ } ) ;
200
+ break ;
201
+ case ConnectivityState . CONNECTING :
202
+ connectingCount += 1 ;
203
+ break ;
204
+ case ConnectivityState . IDLE :
205
+ idleCount += 1 ;
206
+ break ;
207
+ case ConnectivityState . TRANSIENT_FAILURE :
208
+ transientFailureCount += 1 ;
209
+ break ;
210
+ default :
211
+ // Ignore the other possiblity, SHUTDOWN
212
+ }
213
+ }
214
+
215
+ let connectivityState : ConnectivityState ;
216
+ if ( pickerList . length > 0 ) {
217
+ connectivityState = ConnectivityState . READY ;
218
+ } else if ( connectingCount > 0 ) {
219
+ connectivityState = ConnectivityState . CONNECTING ;
220
+ } else if ( idleCount > 0 ) {
221
+ connectivityState = ConnectivityState . IDLE ;
222
+ } else {
223
+ connectivityState = ConnectivityState . TRANSIENT_FAILURE ;
224
+ }
225
+
226
+ let picker : Picker ;
227
+ switch ( connectivityState ) {
228
+ case ConnectivityState . READY :
229
+ picker = new WeightedTargetPicker ( pickerList ) ;
230
+ break ;
231
+ case ConnectivityState . CONNECTING :
232
+ case ConnectivityState . READY :
233
+ picker = new QueuePicker ( this ) ;
234
+ break ;
235
+ default :
236
+ picker = new UnavailablePicker ( {
237
+ code : Status . UNAVAILABLE ,
238
+ details : 'weighted_target: all children report state TRANSIENT_FAILURE' ,
239
+ metadata : new Metadata ( )
240
+ } ) ;
241
+ }
242
+ this . channelControlHelper . updateState ( connectivityState , picker ) ;
243
+ }
244
+
245
+ updateAddressList ( addressList : SubchannelAddress [ ] , lbConfig : LoadBalancingConfig , attributes : { [ key : string ] : unknown ; } ) : void {
246
+ if ( ! isWeightedTargetLoadBalancingConfig ( lbConfig ) ) {
247
+ // Reject a config of the wrong type
248
+ return ;
249
+ }
250
+
251
+ /* For each address, the first element of its localityPath array determines
252
+ * which child it belongs to. So we bucket those addresses by that first
253
+ * element, and pass along the rest of the localityPath for that child
254
+ * to use. */
255
+ const childAddressMap = new Map < string , SubchannelAddress [ ] > ( ) ;
256
+ for ( const address of addressList ) {
257
+ if ( ! isLocalitySubchannelAddress ( address ) ) {
258
+ // Reject address that cannot be associated with targets
259
+ return ;
260
+ }
261
+ if ( address . localityPath . length < 1 ) {
262
+ // Reject address that cannot be associated with targets
263
+ return ;
264
+ }
265
+ const childName = address . localityPath [ 0 ] ;
266
+ const childAddress : LocalitySubchannelAddress = {
267
+ ...address ,
268
+ localityPath : address . localityPath . slice ( 1 ) ,
269
+ } ;
270
+ let childAddressList = childAddressMap . get ( childName ) ;
271
+ if ( childAddressList === undefined ) {
272
+ childAddressList = [ ] ;
273
+ childAddressMap . set ( childName , childAddressList ) ;
274
+ }
275
+ childAddressList . push ( childAddress ) ;
276
+ }
277
+
278
+ this . targetList = Array . from ( lbConfig . weighted_target . targets . keys ( ) ) ;
279
+ for ( const [ targetName , targetConfig ] of lbConfig . weighted_target . targets ) {
280
+ let target = this . targets . get ( targetName ) ;
281
+ if ( target === undefined ) {
282
+ target = new this . WeightedChildImpl ( this , targetName ) ;
283
+ this . targets . set ( targetName , target ) ;
284
+ } else {
285
+ target . maybeReactivate ( ) ;
286
+ }
287
+ target . updateAddressList ( childAddressMap . get ( targetName ) ?? [ ] , targetConfig , attributes ) ;
288
+ }
289
+
290
+ // Deactivate targets that are not in the new config
291
+ for ( const [ targetName , target ] of this . targets ) {
292
+ if ( this . targetList . indexOf ( targetName ) < 0 ) {
293
+ target . deactivate ( ) ;
294
+ }
295
+ }
296
+
297
+ this . updateState ( ) ;
298
+ }
299
+ exitIdle ( ) : void {
300
+ for ( const targetName of this . targetList ) {
301
+ this . targets . get ( targetName ) ?. exitIdle ( ) ;
302
+ }
303
+ }
304
+ resetBackoff ( ) : void {
305
+ for ( const targetName of this . targetList ) {
306
+ this . targets . get ( targetName ) ?. resetBackoff ( ) ;
307
+ }
308
+ }
309
+ destroy ( ) : void {
310
+ for ( const target of this . targets . values ( ) ) {
311
+ target . destroy ( ) ;
312
+ }
313
+ this . targets . clear ( ) ;
314
+ }
315
+ getTypeName ( ) : string {
316
+ return TYPE_NAME ;
317
+ }
318
+ }
319
+
320
+ export function setup ( ) {
321
+ registerLoadBalancerType ( TYPE_NAME , WeightedTargetLoadBalancer ) ;
322
+ }
0 commit comments