Skip to content

Commit 43a3bad

Browse files
committed
Fix circuit breaking functionality
1 parent ec7c819 commit 43a3bad

File tree

3 files changed

+63
-18
lines changed

3 files changed

+63
-18
lines changed

packages/grpc-js-xds/interop/xds-interop-client.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,18 @@ const currentConfig: ClientConfiguration = {
196196
let anyCallSucceeded = false;
197197

198198
const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
199+
num_rpcs_started_by_method: {
200+
EMPTY_CALL: 0,
201+
UNARY_CALL: 0
202+
},
203+
num_rpcs_succeeded_by_method: {
204+
EMPTY_CALL: 0,
205+
UNARY_CALL: 0
206+
},
207+
num_rpcs_failed_by_method: {
208+
EMPTY_CALL: 0,
209+
UNARY_CALL: 0
210+
},
199211
stats_per_method: {
200212
EMPTY_CALL: {
201213
rpcs_started: 0,
@@ -208,14 +220,28 @@ const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
208220
}
209221
};
210222

223+
function addAccumulatedCallStarted(callName: string) {
224+
accumulatedStats.stats_per_method![callName].rpcs_started! += 1;
225+
accumulatedStats.num_rpcs_started_by_method![callName] += 1;
226+
}
227+
228+
function addAccumulatedCallEnded(callName: string, result: grpc.status) {
229+
accumulatedStats.stats_per_method![callName].result![result] = (accumulatedStats.stats_per_method![callName].result![result] ?? 0) + 1;
230+
if (result === grpc.status.OK) {
231+
accumulatedStats.num_rpcs_succeeded_by_method![callName] += 1;
232+
} else {
233+
accumulatedStats.num_rpcs_failed_by_method![callName] += 1;
234+
}
235+
}
236+
211237
const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = {
212238
UnaryCall: {},
213239
EmptyCall: {}
214240
}
215241

216242
function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
217-
const callTypeStats = accumulatedStats.stats_per_method![callTypeEnumMapReverse[type]];
218-
callTypeStats.rpcs_started! += 1;
243+
const callEnumName = callTypeEnumMapReverse[type];
244+
addAccumulatedCallStarted(callEnumName);
219245
const notifier = callStatsTracker.startCall();
220246
let gotMetadata: boolean = false;
221247
let hostname: string | null = null;
@@ -235,7 +261,7 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
235261
} else {
236262
callTimeHistogram[type][statusCode][duration[0]] = 1;
237263
}
238-
callTypeStats.result![statusCode] = (callTypeStats.result![statusCode] ?? 0) + 1;
264+
addAccumulatedCallEnded(callEnumName, statusCode);
239265
if (error) {
240266
if (failOnFailedRpcs && anyCallSucceeded) {
241267
console.error('A call failed after a call succeeded');

packages/grpc-js-xds/src/load-balancer-cds.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,17 @@ export class CdsLoadBalancer implements LoadBalancer {
8080
this.watcher = {
8181
onValidUpdate: (update) => {
8282
this.latestCdsUpdate = update;
83+
let maxConcurrentRequests: number | undefined = undefined;
84+
for (const threshold of update.circuit_breakers?.thresholds ?? []) {
85+
if (threshold.priority === 'DEFAULT') {
86+
maxConcurrentRequests = threshold.max_requests?.value;
87+
}
88+
}
8389
/* the lrs_server.self field indicates that the same server should be
8490
* used for load reporting as for other xDS operations. Setting
8591
* lrsLoadReportingServerName to the empty string sets that behavior.
8692
* Otherwise, if the field is omitted, load reporting is disabled. */
87-
const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined);
93+
const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined, maxConcurrentRequests);
8894
trace('Child update EDS config: ' + JSON.stringify(edsConfig));
8995
this.childBalancer.updateAddressList(
9096
[],

packages/grpc-js-xds/src/load-balancer-eds.ts

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { Watcher } from './xds-stream-state/xds-stream-state';
3737
import Filter = experimental.Filter;
3838
import BaseFilter = experimental.BaseFilter;
3939
import FilterFactory = experimental.FilterFactory;
40+
import FilterStackFactory = experimental.FilterStackFactory;
4041
import CallStream = experimental.CallStream;
4142

4243
const TRACER_NAME = 'eds_balancer';
@@ -204,27 +205,42 @@ export class EdsLoadBalancer implements LoadBalancer {
204205
* Otherwise, delegate picking the subchannel to the child
205206
* balancer. */
206207
if (dropCategory === null) {
207-
return originalPicker.pick(pickArgs);
208+
const originalPick = originalPicker.pick(pickArgs);
209+
let extraFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
210+
this.concurrentRequests -= 1;
211+
});
212+
if (originalPick.extraFilterFactory) {
213+
extraFilterFactory = new FilterStackFactory([originalPick.extraFilterFactory, extraFilterFactory]);
214+
}
215+
return {
216+
pickResultType: originalPick.pickResultType,
217+
status: originalPick.status,
218+
subchannel: originalPick.subchannel,
219+
onCallStarted: () => {
220+
originalPick.onCallStarted?.();
221+
this.concurrentRequests += 1;
222+
},
223+
extraFilterFactory: extraFilterFactory
224+
};
208225
} else {
226+
let details: string;
209227
if (dropCategory === true) {
228+
details = 'Call dropped by load balancing policy.';
210229
this.clusterDropStats?.addUncategorizedCallDropped();
211230
} else {
231+
details = `Call dropped by load balancing policy. Category: ${dropCategory}`;
212232
this.clusterDropStats?.addCallDropped(dropCategory);
213233
}
214234
return {
215235
pickResultType: PickResultType.DROP,
216236
status: {
217237
code: Status.UNAVAILABLE,
218-
details: `Call dropped by load balancing policy. Category: ${dropCategory}`,
238+
details: details,
219239
metadata: new Metadata(),
220240
},
221241
subchannel: null,
222-
extraFilterFactory: new CallTrackingFilterFactory(() => {
223-
this.concurrentRequests -= 1;
224-
}),
225-
onCallStarted: () => {
226-
this.concurrentRequests += 1;
227-
},
242+
extraFilterFactory: null,
243+
onCallStarted: null
228244
};
229245
}
230246
},
@@ -265,15 +281,12 @@ export class EdsLoadBalancer implements LoadBalancer {
265281
* output, as a sentinel value indicating a drop with no category.
266282
*/
267283
private checkForDrop(): string | true | null {
268-
if (!this.latestEdsUpdate?.policy) {
269-
return null;
284+
if (this.lastestConfig && this.concurrentRequests >= this.lastestConfig.getMaxConcurrentRequests()) {
285+
return true;
270286
}
271-
if (!this.lastestConfig) {
287+
if (!this.latestEdsUpdate?.policy) {
272288
return null;
273289
}
274-
if (this.concurrentRequests >= this.lastestConfig.getMaxConcurrentRequests()) {
275-
return true;
276-
}
277290
/* The drop_overloads policy is a list of pairs of category names and
278291
* probabilities. For each one, if the random number is within that
279292
* probability range, we drop the call citing that category. Otherwise, the

0 commit comments

Comments
 (0)