Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions packages/grpc-js-xds/interop/xds-interop-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ const currentConfig: ClientConfiguration = {
let anyCallSucceeded = false;

const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
num_rpcs_started_by_method: {
EMPTY_CALL: 0,
UNARY_CALL: 0
},
num_rpcs_succeeded_by_method: {
EMPTY_CALL: 0,
UNARY_CALL: 0
},
num_rpcs_failed_by_method: {
EMPTY_CALL: 0,
UNARY_CALL: 0
},
stats_per_method: {
EMPTY_CALL: {
rpcs_started: 0,
Expand All @@ -208,14 +220,28 @@ const accumulatedStats: LoadBalancerAccumulatedStatsResponse = {
}
};

function addAccumulatedCallStarted(callName: string) {
accumulatedStats.stats_per_method![callName].rpcs_started! += 1;
accumulatedStats.num_rpcs_started_by_method![callName] += 1;
}

function addAccumulatedCallEnded(callName: string, result: grpc.status) {
accumulatedStats.stats_per_method![callName].result![result] = (accumulatedStats.stats_per_method![callName].result![result] ?? 0) + 1;
if (result === grpc.status.OK) {
accumulatedStats.num_rpcs_succeeded_by_method![callName] += 1;
} else {
accumulatedStats.num_rpcs_failed_by_method![callName] += 1;
}
}

const callTimeHistogram: {[callType: string]: {[status: number]: number[]}} = {
UnaryCall: {},
EmptyCall: {}
}

function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFailedRpcs: boolean, callStatsTracker: CallStatsTracker) {
const callTypeStats = accumulatedStats.stats_per_method![callTypeEnumMapReverse[type]];
callTypeStats.rpcs_started! += 1;
const callEnumName = callTypeEnumMapReverse[type];
addAccumulatedCallStarted(callEnumName);
const notifier = callStatsTracker.startCall();
let gotMetadata: boolean = false;
let hostname: string | null = null;
Expand All @@ -235,7 +261,7 @@ function makeSingleRequest(client: TestServiceClient, type: CallType, failOnFail
} else {
callTimeHistogram[type][statusCode][duration[0]] = 1;
}
callTypeStats.result![statusCode] = (callTypeStats.result![statusCode] ?? 0) + 1;
addAccumulatedCallEnded(callEnumName, statusCode);
if (error) {
if (failOnFailedRpcs && anyCallSucceeded) {
console.error('A call failed after a call succeeded');
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js-xds/scripts/xds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ GRPC_NODE_TRACE=xds_client,xds_resolver,cds_balancer,eds_balancer,priority,weigh
GRPC_NODE_VERBOSITY=DEBUG \
NODE_XDS_INTEROP_VERBOSITY=1 \
python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case="all,timeout" \
--test_case="all,timeout,circuit_breaking" \
--project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server-4 \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \
Expand Down
8 changes: 7 additions & 1 deletion packages/grpc-js-xds/src/load-balancer-cds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,17 @@ export class CdsLoadBalancer implements LoadBalancer {
this.watcher = {
onValidUpdate: (update) => {
this.latestCdsUpdate = update;
let maxConcurrentRequests: number | undefined = undefined;
for (const threshold of update.circuit_breakers?.thresholds ?? []) {
if (threshold.priority === 'DEFAULT') {
maxConcurrentRequests = threshold.max_requests?.value;
}
}
/* the lrs_server.self field indicates that the same server should be
* used for load reporting as for other xDS operations. Setting
* lrsLoadReportingServerName to the empty string sets that behavior.
* Otherwise, if the field is omitted, load reporting is disabled. */
const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined);
const edsConfig: EdsLoadBalancingConfig = new EdsLoadBalancingConfig(update.name, [], [], update.eds_cluster_config!.service_name === '' ? undefined : update.eds_cluster_config!.service_name, update.lrs_server?.self ? '' : undefined, maxConcurrentRequests);
trace('Child update EDS config: ' + JSON.stringify(edsConfig));
this.childBalancer.updateAddressList(
[],
Expand Down
85 changes: 74 additions & 11 deletions packages/grpc-js-xds/src/load-balancer-eds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
*/

import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental } from '@grpc/grpc-js';
import { connectivityState as ConnectivityState, status as Status, Metadata, logVerbosity as LogVerbosity, experimental, StatusObject } from '@grpc/grpc-js';
import { getSingletonXdsClient, XdsClient, XdsClusterDropStats } from './xds-client';
import { ClusterLoadAssignment__Output } from './generated/envoy/config/endpoint/v3/ClusterLoadAssignment';
import { Locality__Output } from './generated/envoy/api/v2/core/Locality';
Expand All @@ -34,6 +34,11 @@ import { validateLoadBalancingConfig } from '@grpc/grpc-js/build/src/experimenta
import { WeightedTarget, WeightedTargetLoadBalancingConfig } from './load-balancer-weighted-target';
import { LrsLoadBalancingConfig } from './load-balancer-lrs';
import { Watcher } from './xds-stream-state/xds-stream-state';
import Filter = experimental.Filter;
import BaseFilter = experimental.BaseFilter;
import FilterFactory = experimental.FilterFactory;
import FilterStackFactory = experimental.FilterStackFactory;
import CallStream = experimental.CallStream;

const TRACER_NAME = 'eds_balancer';

Expand All @@ -47,15 +52,19 @@ function localityToName(locality: Locality__Output) {
return `{region=${locality.region},zone=${locality.zone},sub_zone=${locality.sub_zone}}`;
}

const DEFAULT_MAX_CONCURRENT_REQUESTS = 1024;

export class EdsLoadBalancingConfig implements LoadBalancingConfig {
private maxConcurrentRequests: number;
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
const jsonObj: {[key: string]: any} = {
cluster: this.cluster,
locality_picking_policy: this.localityPickingPolicy.map(policy => policy.toJsonObject()),
endpoint_picking_policy: this.endpointPickingPolicy.map(policy => policy.toJsonObject())
endpoint_picking_policy: this.endpointPickingPolicy.map(policy => policy.toJsonObject()),
max_concurrent_requests: this.maxConcurrentRequests
};
if (this.edsServiceName !== undefined) {
jsonObj.eds_service_name = this.edsServiceName;
Expand All @@ -68,8 +77,8 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
};
}

constructor(private cluster: string, private localityPickingPolicy: LoadBalancingConfig[], private endpointPickingPolicy: LoadBalancingConfig[], private edsServiceName?: string, private lrsLoadReportingServerName?: string) {

constructor(private cluster: string, private localityPickingPolicy: LoadBalancingConfig[], private endpointPickingPolicy: LoadBalancingConfig[], private edsServiceName?: string, private lrsLoadReportingServerName?: string, maxConcurrentRequests?: number) {
this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS;
}

getCluster() {
Expand All @@ -92,6 +101,10 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
return this.lrsLoadReportingServerName;
}

getMaxConcurrentRequests() {
return this.maxConcurrentRequests;
}

static createFromJson(obj: any): EdsLoadBalancingConfig {
if (!('cluster' in obj && typeof obj.cluster === 'string')) {
throw new Error('eds config must have a string field cluster');
Expand All @@ -108,7 +121,28 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig {
if ('lrs_load_reporting_server_name' in obj && (!obj.lrs_load_reporting_server_name === undefined || typeof obj.lrs_load_reporting_server_name === 'string')) {
throw new Error('eds config lrs_load_reporting_server_name must be a string if provided');
}
return new EdsLoadBalancingConfig(obj.cluster, obj.locality_picking_policy.map(validateLoadBalancingConfig), obj.endpoint_picking_policy.map(validateLoadBalancingConfig), obj.eds_service_name, obj.lrs_load_reporting_server_name);
if ('max_concurrent_requests' in obj && (!obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) {
throw new Error('eds config max_concurrent_requests must be a number if provided');
}
return new EdsLoadBalancingConfig(obj.cluster, obj.locality_picking_policy.map(validateLoadBalancingConfig), obj.endpoint_picking_policy.map(validateLoadBalancingConfig), obj.eds_service_name, obj.lrs_load_reporting_server_name, obj.max_concurrent_requests);
}
}

class CallEndTrackingFilter extends BaseFilter implements Filter {
constructor(private onCallEnd: () => void) {
super();
}
receiveTrailers(status: StatusObject) {
this.onCallEnd();
return status;
}
}

class CallTrackingFilterFactory implements FilterFactory<CallEndTrackingFilter> {
constructor(private onCallEnd: () => void) {}

createFilter(callStream: CallStream) {
return new CallEndTrackingFilter(this.onCallEnd);
}
}

Expand Down Expand Up @@ -149,6 +183,8 @@ export class EdsLoadBalancer implements LoadBalancer {

private clusterDropStats: XdsClusterDropStats | null = null;

private concurrentRequests: number = 0;

constructor(private readonly channelControlHelper: ChannelControlHelper) {
this.childBalancer = new ChildLoadBalancerHandler({
createSubchannel: (subchannelAddress, subchannelArgs) =>
Expand All @@ -169,19 +205,42 @@ export class EdsLoadBalancer implements LoadBalancer {
* Otherwise, delegate picking the subchannel to the child
* balancer. */
if (dropCategory === null) {
return originalPicker.pick(pickArgs);
const originalPick = originalPicker.pick(pickArgs);
let extraFilterFactory: FilterFactory<Filter> = new CallTrackingFilterFactory(() => {
this.concurrentRequests -= 1;
});
if (originalPick.extraFilterFactory) {
extraFilterFactory = new FilterStackFactory([originalPick.extraFilterFactory, extraFilterFactory]);
}
return {
pickResultType: originalPick.pickResultType,
status: originalPick.status,
subchannel: originalPick.subchannel,
onCallStarted: () => {
originalPick.onCallStarted?.();
this.concurrentRequests += 1;
},
extraFilterFactory: extraFilterFactory
};
} else {
this.clusterDropStats?.addCallDropped(dropCategory);
let details: string;
if (dropCategory === true) {
details = 'Call dropped by load balancing policy.';
this.clusterDropStats?.addUncategorizedCallDropped();
} else {
details = `Call dropped by load balancing policy. Category: ${dropCategory}`;
this.clusterDropStats?.addCallDropped(dropCategory);
}
return {
pickResultType: PickResultType.DROP,
status: {
code: Status.UNAVAILABLE,
details: `Call dropped by load balancing policy. Category: ${dropCategory}`,
details: details,
metadata: new Metadata(),
},
subchannel: null,
extraFilterFactory: null,
onCallStarted: null,
onCallStarted: null
};
}
},
Expand Down Expand Up @@ -218,9 +277,13 @@ export class EdsLoadBalancer implements LoadBalancer {
/**
* Check whether a single call should be dropped according to the current
* policy, based on randomly chosen numbers. Returns the drop category if
* the call should be dropped, and null otherwise.
* the call should be dropped, and null otherwise. true is a valid
* output, as a sentinel value indicating a drop with no category.
*/
private checkForDrop(): string | null {
private checkForDrop(): string | true | null {
if (this.lastestConfig && this.concurrentRequests >= this.lastestConfig.getMaxConcurrentRequests()) {
return true;
}
if (!this.latestEdsUpdate?.policy) {
return null;
}
Expand Down
9 changes: 9 additions & 0 deletions packages/grpc-js-xds/src/xds-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ function localityEqual(
}

export interface XdsClusterDropStats {
addUncategorizedCallDropped(): void;
addCallDropped(category: string): void;
}

Expand Down Expand Up @@ -158,6 +159,7 @@ interface ClusterLocalityStats {

interface ClusterLoadReport {
callsDropped: Map<string, number>;
uncategorizedCallsDropped: number;
localityStats: ClusterLocalityStats[];
intervalStart: [number, number];
}
Expand Down Expand Up @@ -195,6 +197,7 @@ class ClusterLoadReportMap {
}
const newStats: ClusterLoadReport = {
callsDropped: new Map<string, number>(),
uncategorizedCallsDropped: 0,
localityStats: [],
intervalStart: process.hrtime(),
};
Expand Down Expand Up @@ -871,8 +874,10 @@ export class XdsClient {
totalDroppedRequests += count;
}
}
totalDroppedRequests += stats.uncategorizedCallsDropped;
// Clear out dropped call stats after sending them
stats.callsDropped.clear();
stats.uncategorizedCallsDropped = 0;
const interval = process.hrtime(stats.intervalStart);
stats.intervalStart = process.hrtime();
// Skip clusters with 0 requests
Expand Down Expand Up @@ -957,6 +962,7 @@ export class XdsClient {
trace('addClusterDropStats(lrsServer=' + lrsServer + ', clusterName=' + clusterName + ', edsServiceName=' + edsServiceName + ')');
if (lrsServer !== '') {
return {
addUncategorizedCallDropped: () => {},
addCallDropped: (category) => {},
};
}
Expand All @@ -965,6 +971,9 @@ export class XdsClient {
edsServiceName
);
return {
addUncategorizedCallDropped: () => {
clusterStats.uncategorizedCallsDropped += 1;
},
addCallDropped: (category) => {
const prevCount = clusterStats.callsDropped.get(category) ?? 0;
clusterStats.callsDropped.set(category, prevCount + 1);
Expand Down