1515 *
1616 */
1717
18- import { connectivityState as ConnectivityState , status as Status , Metadata , logVerbosity as LogVerbosity , experimental } from '@grpc/grpc-js' ;
18+ import { connectivityState as ConnectivityState , status as Status , Metadata , logVerbosity as LogVerbosity , experimental , StatusObject } from '@grpc/grpc-js' ;
1919import { getSingletonXdsClient , XdsClient , XdsClusterDropStats } from './xds-client' ;
2020import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment' ;
2121import { Locality__Output } from './generated/envoy/api/v2/core/Locality' ;
@@ -34,6 +34,11 @@ import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimenta
3434import { WeightedTarget , WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target' ;
3535import { LrsLoadBalancingConfig } from './load-balancer-lrs' ;
3636import { Watcher } from './xds-stream-state/xds-stream-state' ;
37+ import Filter = experimental . Filter ;
38+ import BaseFilter = experimental . BaseFilter ;
39+ import FilterFactory = experimental . FilterFactory ;
40+ import FilterStackFactory = experimental . FilterStackFactory ;
41+ import CallStream = experimental . CallStream ;
3742
3843const TRACER_NAME = 'eds_balancer' ;
3944
@@ -47,15 +52,19 @@ function localityToName(locality: Locality__Output) {
4752 return `{region=${ locality . region } ,zone=${ locality . zone } ,sub_zone=${ locality . sub_zone } }` ;
4853}
4954
55+ const DEFAULT_MAX_CONCURRENT_REQUESTS = 1024 ;
56+
5057export class EdsLoadBalancingConfig implements LoadBalancingConfig {
58+ private maxConcurrentRequests : number ;
5159 getLoadBalancerName ( ) : string {
5260 return TYPE_NAME ;
5361 }
5462 toJsonObject ( ) : object {
5563 const jsonObj : { [ key : string ] : any } = {
5664 cluster : this . cluster ,
5765 locality_picking_policy : this . localityPickingPolicy . map ( policy => policy . toJsonObject ( ) ) ,
58- endpoint_picking_policy : this . endpointPickingPolicy . map ( policy => policy . toJsonObject ( ) )
66+ endpoint_picking_policy : this . endpointPickingPolicy . map ( policy => policy . toJsonObject ( ) ) ,
67+ max_concurrent_requests : this . maxConcurrentRequests
5968 } ;
6069 if ( this . edsServiceName !== undefined ) {
6170 jsonObj . eds_service_name = this . edsServiceName ;
@@ -68,8 +77,8 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
6877 } ;
6978 }
7079
71- constructor ( private cluster : string , private localityPickingPolicy : LoadBalancingConfig [ ] , private endpointPickingPolicy : LoadBalancingConfig [ ] , private edsServiceName ?: string , private lrsLoadReportingServerName ?: string ) {
72-
80+ constructor ( private cluster : string , private localityPickingPolicy : LoadBalancingConfig [ ] , private endpointPickingPolicy : LoadBalancingConfig [ ] , private edsServiceName ?: string , private lrsLoadReportingServerName ?: string , maxConcurrentRequests ?: number ) {
81+ this . maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS ;
7382 }
7483
7584 getCluster ( ) {
@@ -92,6 +101,10 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
92101 return this . lrsLoadReportingServerName ;
93102 }
94103
104+ getMaxConcurrentRequests ( ) {
105+ return this . maxConcurrentRequests ;
106+ }
107+
95108 static createFromJson ( obj : any ) : EdsLoadBalancingConfig {
96109 if ( ! ( 'cluster' in obj && typeof obj . cluster === 'string' ) ) {
97110 throw new Error ( 'eds config must have a string field cluster' ) ;
@@ -108,7 +121,28 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
108121 if ( 'lrs_load_reporting_server_name' in obj && ( ! obj . lrs_load_reporting_server_name === undefined || typeof obj . lrs_load_reporting_server_name === 'string' ) ) {
109122 throw new Error ( 'eds config lrs_load_reporting_server_name must be a string if provided' ) ;
110123 }
111- return new EdsLoadBalancingConfig ( obj . cluster , obj . locality_picking_policy . map ( validateLoadBalancingConfig ) , obj . endpoint_picking_policy . map ( validateLoadBalancingConfig ) , obj . eds_service_name , obj . lrs_load_reporting_server_name ) ;
124+ if ( 'max_concurrent_requests' in obj && ( ! obj . max_concurrent_requests === undefined || typeof obj . max_concurrent_requests === 'number' ) ) {
125+ throw new Error ( 'eds config max_concurrent_requests must be a number if provided' ) ;
126+ }
127+ return new EdsLoadBalancingConfig ( obj . cluster , obj . locality_picking_policy . map ( validateLoadBalancingConfig ) , obj . endpoint_picking_policy . map ( validateLoadBalancingConfig ) , obj . eds_service_name , obj . lrs_load_reporting_server_name , obj . max_concurrent_requests ) ;
128+ }
129+ }
130+
131+ class CallEndTrackingFilter extends BaseFilter implements Filter {
132+ constructor ( private onCallEnd : ( ) => void ) {
133+ super ( ) ;
134+ }
135+ receiveTrailers ( status : StatusObject ) {
136+ this . onCallEnd ( ) ;
137+ return status ;
138+ }
139+ }
140+
141+ class CallTrackingFilterFactory implements FilterFactory < CallEndTrackingFilter > {
142+ constructor ( private onCallEnd : ( ) => void ) { }
143+
144+ createFilter ( callStream : CallStream ) {
145+ return new CallEndTrackingFilter ( this . onCallEnd ) ;
112146 }
113147}
114148
@@ -149,6 +183,8 @@ export class EdsLoadBalancer implements LoadBalancer {
149183
150184 private clusterDropStats : XdsClusterDropStats | null = null ;
151185
186+ private concurrentRequests : number = 0 ;
187+
152188 constructor ( private readonly channelControlHelper : ChannelControlHelper ) {
153189 this . childBalancer = new ChildLoadBalancerHandler ( {
154190 createSubchannel : ( subchannelAddress , subchannelArgs ) =>
@@ -169,19 +205,42 @@ export class EdsLoadBalancer implements LoadBalancer {
169205 * Otherwise, delegate picking the subchannel to the child
170206 * balancer. */
171207 if ( dropCategory === null ) {
172- return originalPicker . pick ( pickArgs ) ;
208+ const originalPick = originalPicker . pick ( pickArgs ) ;
209+ let extraFilterFactory : FilterFactory < Filter > = new CallTrackingFilterFactory ( ( ) => {
210+ this . concurrentRequests -= 1 ;
211+ } ) ;
212+ if ( originalPick . extraFilterFactory ) {
213+ extraFilterFactory = new FilterStackFactory ( [ originalPick . extraFilterFactory , extraFilterFactory ] ) ;
214+ }
215+ return {
216+ pickResultType : originalPick . pickResultType ,
217+ status : originalPick . status ,
218+ subchannel : originalPick . subchannel ,
219+ onCallStarted : ( ) => {
220+ originalPick . onCallStarted ?.( ) ;
221+ this . concurrentRequests += 1 ;
222+ } ,
223+ extraFilterFactory : extraFilterFactory
224+ } ;
173225 } else {
174- this . clusterDropStats ?. addCallDropped ( dropCategory ) ;
226+ let details : string ;
227+ if ( dropCategory === true ) {
228+ details = 'Call dropped by load balancing policy.' ;
229+ this . clusterDropStats ?. addUncategorizedCallDropped ( ) ;
230+ } else {
231+ details = `Call dropped by load balancing policy. Category: ${ dropCategory } ` ;
232+ this . clusterDropStats ?. addCallDropped ( dropCategory ) ;
233+ }
175234 return {
176235 pickResultType : PickResultType . DROP ,
177236 status : {
178237 code : Status . UNAVAILABLE ,
179- details : `Call dropped by load balancing policy. Category: ${ dropCategory } ` ,
238+ details : details ,
180239 metadata : new Metadata ( ) ,
181240 } ,
182241 subchannel : null ,
183242 extraFilterFactory : null ,
184- onCallStarted : null ,
243+ onCallStarted : null
185244 } ;
186245 }
187246 } ,
@@ -218,9 +277,13 @@ export class EdsLoadBalancer implements LoadBalancer {
218277 /**
219278 * Check whether a single call should be dropped according to the current
220279 * policy, based on randomly chosen numbers. Returns the drop category if
221- * the call should be dropped, and null otherwise.
280+ * the call should be dropped, and null otherwise. true is a valid
281+ * output, as a sentinel value indicating a drop with no category.
222282 */
223- private checkForDrop ( ) : string | null {
283+ private checkForDrop ( ) : string | true | null {
284+ if ( this . lastestConfig && this . concurrentRequests >= this . lastestConfig . getMaxConcurrentRequests ( ) ) {
285+ return true ;
286+ }
224287 if ( ! this . latestEdsUpdate ?. policy ) {
225288 return null ;
226289 }
0 commit comments