Skip to content

Commit

Permalink
[core-http] Throttling retry policy fix in core-http (#15832)
Browse files Browse the repository at this point in the history
Fixes #15796
## Problem

The throttlingRetryPolicy in core-http has the potential to retry for an extended period if the service continues returning "retry after" headers on subsequent calls.

Here's the snippet of code that handles the "retry after" retries:

```typescript
  public async sendRequest(httpRequest: WebResource): Promise<HttpOperationResponse> {
    return this._nextPolicy.sendRequest(httpRequest.clone()).catch((err) => {
        // other code elided....
        return delay(delayInMs).then((_: any) => this.sendRequest(httpRequest.clone()));
```

## Solution
Update delay such that it respects abort signal.
Similar to what I had to do for app-config at #15721
  • Loading branch information
HarshaNalluru authored Jun 25, 2021
1 parent 35739ab commit bb9896d
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { AbortError } from "@azure/abort-controller";
import {
BaseRequestPolicy,
RequestPolicy,
Expand All @@ -12,7 +12,7 @@ import {
Constants,
RestError
} from "@azure/core-http";
import { isDefined } from "../internal/typeguards";
import { delay } from "@azure/core-http";

/**
* @internal
Expand All @@ -27,55 +27,6 @@ export function throttlingRetryPolicy(): RequestPolicyFactory {

const StandardAbortMessage = "The operation was aborted.";

/**
* A wrapper for setTimeout that resolves a promise after t milliseconds.
* @param delayInMs - The number of milliseconds to be delayed.
* @param abortSignal - The abortSignal associated with containing operation.
* @param abortErrorMsg - The abort error message associated with containing operation.
* @returns - Resolved promise
*/
export function delay(
delayInMs: number,
abortSignal?: AbortSignalLike,
abortErrorMsg?: string
): Promise<void> {
return new Promise((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined = undefined;
let onAborted: (() => void) | undefined = undefined;

const rejectOnAbort = (): void => {
return reject(new AbortError(abortErrorMsg ? abortErrorMsg : StandardAbortMessage));
};

const removeListeners = (): void => {
if (abortSignal && onAborted) {
abortSignal.removeEventListener("abort", onAborted);
}
};

onAborted = (): void => {
if (isDefined(timer)) {
clearTimeout(timer);
}
removeListeners();
return rejectOnAbort();
};

if (abortSignal && abortSignal.aborted) {
return rejectOnAbort();
}

timer = setTimeout(() => {
removeListeners();
resolve();
}, delayInMs);

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}
});
}

/**
* This policy is a close copy of the ThrottlingRetryPolicy class from
* core-http with modifications to work with how AppConfig is currently
Expand All @@ -97,7 +48,10 @@ export class ThrottlingRetryPolicy extends BaseRequestPolicy {
throw err;
}

await delay(delayInMs, httpRequest.abortSignal, StandardAbortMessage);
await delay(delayInMs, undefined, {
abortSignal: httpRequest.abortSignal,
abortErrorMsg: StandardAbortMessage
});
if (httpRequest.abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/core/core-http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
### Fixed

- Fixed an issue where `proxySettings` does not work when there is username but no password [Issue 15720](https://github.com/Azure/azure-sdk-for-js/issues/15720)
- Throttling retry policy respects abort signal [#15796](https://github.com/Azure/azure-sdk-for-js/issues/15796)

## 1.2.6 (2021-06-14)

Expand Down
5 changes: 4 additions & 1 deletion sdk/core/core-http/review/core-http.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ export class DefaultHttpClient extends FetchHttpClient {
}

// @public
export function delay<T>(t: number, value?: T): Promise<T | void>;
export function delay<T>(delayInMs: number, value?: T, options?: {
abortSignal?: AbortSignalLike;
abortErrorMsg?: string;
}): Promise<T | void>;

// @public
export interface DeserializationContentTypes {
Expand Down
3 changes: 1 addition & 2 deletions sdk/core/core-http/src/coreHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ export {
export {
stripRequest,
stripResponse,
delay,
executePromisesSequentially,
generateUuid,
encodeUri,
Expand All @@ -113,7 +112,7 @@ export {
} from "./util/utils";
export { URLBuilder, URLQuery } from "./url";
export { AbortSignalLike } from "@azure/abort-controller";

export { delay } from "./util/delay";
// legacy exports. Use core-tracing instead (and remove on next major version update of core-http).
export { createSpanFunction, SpanConfig } from "./createSpanLegacy";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import { Constants } from "../util/constants";
import { HttpOperationResponse } from "../httpOperationResponse";
import { WebResourceLike } from "../webResource";
import { delay } from "../util/utils";
import { delay } from "../util/delay";

// #region Access Token Cycler

Expand Down Expand Up @@ -71,7 +71,7 @@ async function beginRefresh(
): Promise<AccessToken> {
// This wrapper handles exceptions gracefully as long as we haven't exceeded
// the timeout.
async function tryGetAccessToken() {
async function tryGetAccessToken(): Promise<AccessToken | null> {
if (Date.now() < timeoutInMs) {
try {
return await getAccessToken();
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/core-http/src/policies/exponentialRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

import { HttpOperationResponse } from "../httpOperationResponse";
import * as utils from "../util/utils";
import { WebResourceLike } from "../webResource";
import {
BaseRequestPolicy,
Expand All @@ -22,6 +21,7 @@ import {
} from "../util/exponentialBackoffStrategy";
import { RestError } from "../restError";
import { logger } from "../log";
import { delay } from "../util/delay";

export function exponentialRetryPolicy(
retryCount?: number,
Expand Down Expand Up @@ -164,7 +164,7 @@ async function retry(
if (!isAborted && shouldRetry(policy.retryCount, shouldPolicyRetry, retryData, response)) {
logger.info(`Retrying request in ${retryData.retryInterval}`);
try {
await utils.delay(retryData.retryInterval);
await delay(retryData.retryInterval);
const res = await policy._nextPolicy.sendRequest(request.clone());
return retry(policy, request, res, retryData);
} catch (err) {
Expand Down
35 changes: 16 additions & 19 deletions sdk/core/core-http/src/policies/rpRegistrationPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { delay } from "../util/delay";
import { HttpOperationResponse } from "../httpOperationResponse";
import * as utils from "../util/utils";
import { WebResourceLike } from "../webResource";
Expand Down Expand Up @@ -145,9 +146,8 @@ function extractSubscriptionUrl(url: string): string {
* @param provider - The provider name to be registered.
* @param originalRequest - The original request sent by the user that returned a 409 response
* with a message that the provider is not registered.
* @param callback - The callback that handles the RP registration
*/
function registerRP(
async function registerRP(
policy: RPRegistrationPolicy,
urlPrefix: string,
provider: string,
Expand All @@ -159,12 +159,11 @@ function registerRP(
reqOptions.method = "POST";
reqOptions.url = postUrl;

return policy._nextPolicy.sendRequest(reqOptions).then((response) => {
if (response.status !== 200) {
throw new Error(`Autoregistration of ${provider} failed. Please try registering manually.`);
}
return getRegistrationStatus(policy, getUrl, originalRequest);
});
const response = await policy._nextPolicy.sendRequest(reqOptions);
if (response.status !== 200) {
throw new Error(`Autoregistration of ${provider} failed. Please try registering manually.`);
}
return getRegistrationStatus(policy, getUrl, originalRequest);
}

/**
Expand All @@ -176,7 +175,7 @@ function registerRP(
* with a message that the provider is not registered.
* @returns True if RP Registration is successful.
*/
function getRegistrationStatus(
async function getRegistrationStatus(
policy: RPRegistrationPolicy,
url: string,
originalRequest: WebResourceLike
Expand All @@ -185,14 +184,12 @@ function getRegistrationStatus(
reqOptions.url = url;
reqOptions.method = "GET";

return policy._nextPolicy.sendRequest(reqOptions).then((res) => {
const obj = res.parsedBody as any;
if (res.parsedBody && obj.registrationState && obj.registrationState === "Registered") {
return true;
} else {
return utils
.delay(policy._retryTimeout * 1000)
.then(() => getRegistrationStatus(policy, url, originalRequest));
}
});
const res = await policy._nextPolicy.sendRequest(reqOptions);
const obj = res.parsedBody;
if (res.parsedBody && obj.registrationState && obj.registrationState === "Registered") {
return true;
} else {
await delay(policy._retryTimeout * 1000);
return getRegistrationStatus(policy, url, originalRequest);
}
}
4 changes: 2 additions & 2 deletions sdk/core/core-http/src/policies/systemErrorRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license.

import { HttpOperationResponse } from "../httpOperationResponse";
import * as utils from "../util/utils";
import { WebResourceLike } from "../webResource";
import {
BaseRequestPolicy,
Expand All @@ -21,6 +20,7 @@ import {
DEFAULT_CLIENT_MIN_RETRY_INTERVAL,
isNumber
} from "../util/exponentialBackoffStrategy";
import { delay } from "../util/delay";

export function systemErrorRetryPolicy(
retryCount?: number,
Expand Down Expand Up @@ -107,7 +107,7 @@ async function retry(
if (shouldRetry(policy.retryCount, shouldPolicyRetry, retryData, operationResponse, err)) {
// If previous operation ended with an error and the policy allows a retry, do that
try {
await utils.delay(retryData.retryInterval);
await delay(retryData.retryInterval);
return policy._nextPolicy.sendRequest(request.clone());
} catch (nestedErr) {
return retry(policy, request, operationResponse, nestedErr, retryData);
Expand Down
14 changes: 12 additions & 2 deletions sdk/core/core-http/src/policies/throttlingRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import {
import { WebResourceLike } from "../webResource";
import { HttpOperationResponse } from "../httpOperationResponse";
import { Constants } from "../util/constants";
import { delay } from "../util/utils";
import { delay } from "../util/delay";
import { AbortError } from "@azure/abort-controller";

type ResponseHandler = (
httpRequest: WebResourceLike,
Expand All @@ -26,6 +27,8 @@ export function throttlingRetryPolicy(): RequestPolicyFactory {
};
}

const StandardAbortMessage = "The operation was aborted.";

/**
* To learn more, please refer to
* https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-request-limits,
Expand Down Expand Up @@ -67,7 +70,14 @@ export class ThrottlingRetryPolicy extends BaseRequestPolicy {
retryAfterHeader
);
if (delayInMs) {
return delay(delayInMs).then((_: any) => this._nextPolicy.sendRequest(httpRequest));
await delay(delayInMs, undefined, {
abortSignal: httpRequest.abortSignal,
abortErrorMsg: StandardAbortMessage
});
if (httpRequest.abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
}
return this._nextPolicy.sendRequest(httpRequest);
}
}

Expand Down
62 changes: 62 additions & 0 deletions sdk/core/core-http/src/util/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { isDefined } from "./typeguards";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
const StandardAbortMessage = "The operation was aborted.";

/**
* A wrapper for setTimeout that resolves a promise after delayInMs milliseconds.
* @param delayInMs - The number of milliseconds to be delayed.
* @param value - The value to be resolved with after a timeout of t milliseconds.
* @param options - The options for delay - currently abort options
* @param abortSignal - The abortSignal associated with containing operation.
* @param abortErrorMsg - The abort error message associated with containing operation.
* @returns - Resolved promise
*/
export function delay<T>(
delayInMs: number,
value?: T,
options?: {
abortSignal?: AbortSignalLike;
abortErrorMsg?: string;
}
): Promise<T | void> {
return new Promise((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined = undefined;
let onAborted: (() => void) | undefined = undefined;

const rejectOnAbort = (): void => {
return reject(
new AbortError(options?.abortErrorMsg ? options?.abortErrorMsg : StandardAbortMessage)
);
};

const removeListeners = (): void => {
if (options?.abortSignal && onAborted) {
options.abortSignal.removeEventListener("abort", onAborted);
}
};

onAborted = (): void => {
if (isDefined(timer)) {
clearTimeout(timer);
}
removeListeners();
return rejectOnAbort();
};

if (options?.abortSignal && options.abortSignal.aborted) {
return rejectOnAbort();
}

timer = setTimeout(() => {
removeListeners();
resolve(value);
}, delayInMs);

if (options?.abortSignal) {
options.abortSignal.addEventListener("abort", onAborted);
}
});
}
11 changes: 11 additions & 0 deletions sdk/core/core-http/src/util/typeguards.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

/**
* Helper TypeGuard that checks if the value is not null or undefined.
* @param thing - Anything
* @internal
*/
export function isDefined<T>(thing: T | undefined | null): thing is T {
return typeof thing !== "undefined" && thing !== null;
}
10 changes: 0 additions & 10 deletions sdk/core/core-http/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,6 @@ export function executePromisesSequentially(
return result;
}

/**
* A wrapper for setTimeout that resolves a promise after t milliseconds.
* @param t - The number of milliseconds to be delayed.
* @param value - The value to be resolved with after a timeout of t milliseconds.
* @returns Resolved promise
*/
export function delay<T>(t: number, value?: T): Promise<T | void> {
return new Promise((resolve) => setTimeout(() => resolve(value), t));
}

/**
* Service callback that is returned for REST requests initiated by the service client.
*/
Expand Down
Loading

0 comments on commit bb9896d

Please sign in to comment.