From bb9896d200a0e9b77210fdc7881a62bfe973a163 Mon Sep 17 00:00:00 2001 From: Harsha Nalluru Date: Thu, 24 Jun 2021 18:49:06 -0700 Subject: [PATCH] [core-http] Throttling retry policy fix in core-http (#15832) Fixes https://github.com/Azure/azure-sdk-for-js/issues/15796 ## Problem The throttlingRetryPolicy in core-http has the potential to retry for an extended period if the service continues returning "retry after" headers on subsequent calls. Here's the snippet of code that handles the "retry after" retries: ```typescript public async sendRequest(httpRequest: WebResource): Promise { return this._nextPolicy.sendRequest(httpRequest.clone()).catch((err) => { // other code elided.... return delay(delayInMs).then((_: any) => this.sendRequest(httpRequest.clone())); ``` ## Solution Update delay such that it respects abort signal. Similar to what I had to do for app-config at https://github.com/Azure/azure-sdk-for-js/pull/15721 --- .../src/policies/throttlingRetryPolicy.ts | 58 ++--------------- sdk/core/core-http/CHANGELOG.md | 1 + sdk/core/core-http/review/core-http.api.md | 5 +- sdk/core/core-http/src/coreHttp.ts | 3 +- .../bearerTokenAuthenticationPolicy.ts | 4 +- .../src/policies/exponentialRetryPolicy.ts | 4 +- .../src/policies/rpRegistrationPolicy.ts | 35 +++++------ .../src/policies/systemErrorRetryPolicy.ts | 4 +- .../src/policies/throttlingRetryPolicy.ts | 14 ++++- sdk/core/core-http/src/util/delay.ts | 62 +++++++++++++++++++ sdk/core/core-http/src/util/typeguards.ts | 11 ++++ sdk/core/core-http/src/util/utils.ts | 10 --- .../policies/throttlingRetryPolicyTests.ts | 38 +++++++++++- 13 files changed, 156 insertions(+), 93 deletions(-) create mode 100644 sdk/core/core-http/src/util/delay.ts create mode 100644 sdk/core/core-http/src/util/typeguards.ts diff --git a/sdk/appconfiguration/app-configuration/src/policies/throttlingRetryPolicy.ts b/sdk/appconfiguration/app-configuration/src/policies/throttlingRetryPolicy.ts index c53f9b599c4f..020daedd7cfb 100644 --- a/sdk/appconfiguration/app-configuration/src/policies/throttlingRetryPolicy.ts +++ b/sdk/appconfiguration/app-configuration/src/policies/throttlingRetryPolicy.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { AbortError } from "@azure/abort-controller"; import { BaseRequestPolicy, RequestPolicy, @@ -12,7 +12,7 @@ import { Constants, RestError } from "@azure/core-http"; -import { isDefined } from "../internal/typeguards"; +import { delay } from "@azure/core-http"; /** * @internal @@ -27,55 +27,6 @@ export function throttlingRetryPolicy(): RequestPolicyFactory { const StandardAbortMessage = "The operation was aborted."; -/** - * A wrapper for setTimeout that resolves a promise after t milliseconds. - * @param delayInMs - The number of milliseconds to be delayed. - * @param abortSignal - The abortSignal associated with containing operation. - * @param abortErrorMsg - The abort error message associated with containing operation. - * @returns - Resolved promise - */ -export function delay( - delayInMs: number, - abortSignal?: AbortSignalLike, - abortErrorMsg?: string -): Promise { - return new Promise((resolve, reject) => { - let timer: ReturnType | undefined = undefined; - let onAborted: (() => void) | undefined = undefined; - - const rejectOnAbort = (): void => { - return reject(new AbortError(abortErrorMsg ? abortErrorMsg : StandardAbortMessage)); - }; - - const removeListeners = (): void => { - if (abortSignal && onAborted) { - abortSignal.removeEventListener("abort", onAborted); - } - }; - - onAborted = (): void => { - if (isDefined(timer)) { - clearTimeout(timer); - } - removeListeners(); - return rejectOnAbort(); - }; - - if (abortSignal && abortSignal.aborted) { - return rejectOnAbort(); - } - - timer = setTimeout(() => { - removeListeners(); - resolve(); - }, delayInMs); - - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); - } - }); -} - /** * This policy is a close copy of the ThrottlingRetryPolicy class from * core-http with modifications to work with how AppConfig is currently @@ -97,7 +48,10 @@ export class ThrottlingRetryPolicy extends BaseRequestPolicy { throw err; } - await delay(delayInMs, httpRequest.abortSignal, StandardAbortMessage); + await delay(delayInMs, undefined, { + abortSignal: httpRequest.abortSignal, + abortErrorMsg: StandardAbortMessage + }); if (httpRequest.abortSignal?.aborted) { throw new AbortError(StandardAbortMessage); } diff --git a/sdk/core/core-http/CHANGELOG.md b/sdk/core/core-http/CHANGELOG.md index 28455bc8f792..75dee5caa518 100644 --- a/sdk/core/core-http/CHANGELOG.md +++ b/sdk/core/core-http/CHANGELOG.md @@ -15,6 +15,7 @@ ### Fixed - Fixed an issue where `proxySettings` does not work when there is username but no password [Issue 15720](https://github.com/Azure/azure-sdk-for-js/issues/15720) +- Throttling retry policy respects abort signal [#15796](https://github.com/Azure/azure-sdk-for-js/issues/15796) ## 1.2.6 (2021-06-14) diff --git a/sdk/core/core-http/review/core-http.api.md b/sdk/core/core-http/review/core-http.api.md index 830b40000f54..37110cddac5c 100644 --- a/sdk/core/core-http/review/core-http.api.md +++ b/sdk/core/core-http/review/core-http.api.md @@ -181,7 +181,10 @@ export class DefaultHttpClient extends FetchHttpClient { } // @public -export function delay(t: number, value?: T): Promise; +export function delay(delayInMs: number, value?: T, options?: { + abortSignal?: AbortSignalLike; + abortErrorMsg?: string; +}): Promise; // @public export interface DeserializationContentTypes { diff --git a/sdk/core/core-http/src/coreHttp.ts b/sdk/core/core-http/src/coreHttp.ts index dd3b91076fe9..6e726f0cffb6 100644 --- a/sdk/core/core-http/src/coreHttp.ts +++ b/sdk/core/core-http/src/coreHttp.ts @@ -99,7 +99,6 @@ export { export { stripRequest, stripResponse, - delay, executePromisesSequentially, generateUuid, encodeUri, @@ -113,7 +112,7 @@ export { } from "./util/utils"; export { URLBuilder, URLQuery } from "./url"; export { AbortSignalLike } from "@azure/abort-controller"; - +export { delay } from "./util/delay"; // legacy exports. Use core-tracing instead (and remove on next major version update of core-http). export { createSpanFunction, SpanConfig } from "./createSpanLegacy"; diff --git a/sdk/core/core-http/src/policies/bearerTokenAuthenticationPolicy.ts b/sdk/core/core-http/src/policies/bearerTokenAuthenticationPolicy.ts index 78e5d9957f85..5cb9bb3ab86f 100644 --- a/sdk/core/core-http/src/policies/bearerTokenAuthenticationPolicy.ts +++ b/sdk/core/core-http/src/policies/bearerTokenAuthenticationPolicy.ts @@ -11,7 +11,7 @@ import { import { Constants } from "../util/constants"; import { HttpOperationResponse } from "../httpOperationResponse"; import { WebResourceLike } from "../webResource"; -import { delay } from "../util/utils"; +import { delay } from "../util/delay"; // #region Access Token Cycler @@ -71,7 +71,7 @@ async function beginRefresh( ): Promise { // This wrapper handles exceptions gracefully as long as we haven't exceeded // the timeout. - async function tryGetAccessToken() { + async function tryGetAccessToken(): Promise { if (Date.now() < timeoutInMs) { try { return await getAccessToken(); diff --git a/sdk/core/core-http/src/policies/exponentialRetryPolicy.ts b/sdk/core/core-http/src/policies/exponentialRetryPolicy.ts index 89fbfb86c836..0623e5a00586 100644 --- a/sdk/core/core-http/src/policies/exponentialRetryPolicy.ts +++ b/sdk/core/core-http/src/policies/exponentialRetryPolicy.ts @@ -2,7 +2,6 @@ // Licensed under the MIT license. import { HttpOperationResponse } from "../httpOperationResponse"; -import * as utils from "../util/utils"; import { WebResourceLike } from "../webResource"; import { BaseRequestPolicy, @@ -22,6 +21,7 @@ import { } from "../util/exponentialBackoffStrategy"; import { RestError } from "../restError"; import { logger } from "../log"; +import { delay } from "../util/delay"; export function exponentialRetryPolicy( retryCount?: number, @@ -164,7 +164,7 @@ async function retry( if (!isAborted && shouldRetry(policy.retryCount, shouldPolicyRetry, retryData, response)) { logger.info(`Retrying request in ${retryData.retryInterval}`); try { - await utils.delay(retryData.retryInterval); + await delay(retryData.retryInterval); const res = await policy._nextPolicy.sendRequest(request.clone()); return retry(policy, request, res, retryData); } catch (err) { diff --git a/sdk/core/core-http/src/policies/rpRegistrationPolicy.ts b/sdk/core/core-http/src/policies/rpRegistrationPolicy.ts index e23a50c0ee3b..76988d11e4d0 100644 --- a/sdk/core/core-http/src/policies/rpRegistrationPolicy.ts +++ b/sdk/core/core-http/src/policies/rpRegistrationPolicy.ts @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { delay } from "../util/delay"; import { HttpOperationResponse } from "../httpOperationResponse"; import * as utils from "../util/utils"; import { WebResourceLike } from "../webResource"; @@ -145,9 +146,8 @@ function extractSubscriptionUrl(url: string): string { * @param provider - The provider name to be registered. * @param originalRequest - The original request sent by the user that returned a 409 response * with a message that the provider is not registered. - * @param callback - The callback that handles the RP registration */ -function registerRP( +async function registerRP( policy: RPRegistrationPolicy, urlPrefix: string, provider: string, @@ -159,12 +159,11 @@ function registerRP( reqOptions.method = "POST"; reqOptions.url = postUrl; - return policy._nextPolicy.sendRequest(reqOptions).then((response) => { - if (response.status !== 200) { - throw new Error(`Autoregistration of ${provider} failed. Please try registering manually.`); - } - return getRegistrationStatus(policy, getUrl, originalRequest); - }); + const response = await policy._nextPolicy.sendRequest(reqOptions); + if (response.status !== 200) { + throw new Error(`Autoregistration of ${provider} failed. Please try registering manually.`); + } + return getRegistrationStatus(policy, getUrl, originalRequest); } /** @@ -176,7 +175,7 @@ function registerRP( * with a message that the provider is not registered. * @returns True if RP Registration is successful. */ -function getRegistrationStatus( +async function getRegistrationStatus( policy: RPRegistrationPolicy, url: string, originalRequest: WebResourceLike @@ -185,14 +184,12 @@ function getRegistrationStatus( reqOptions.url = url; reqOptions.method = "GET"; - return policy._nextPolicy.sendRequest(reqOptions).then((res) => { - const obj = res.parsedBody as any; - if (res.parsedBody && obj.registrationState && obj.registrationState === "Registered") { - return true; - } else { - return utils - .delay(policy._retryTimeout * 1000) - .then(() => getRegistrationStatus(policy, url, originalRequest)); - } - }); + const res = await policy._nextPolicy.sendRequest(reqOptions); + const obj = res.parsedBody; + if (res.parsedBody && obj.registrationState && obj.registrationState === "Registered") { + return true; + } else { + await delay(policy._retryTimeout * 1000); + return getRegistrationStatus(policy, url, originalRequest); + } } diff --git a/sdk/core/core-http/src/policies/systemErrorRetryPolicy.ts b/sdk/core/core-http/src/policies/systemErrorRetryPolicy.ts index 7586b140c9d5..6c7778505212 100644 --- a/sdk/core/core-http/src/policies/systemErrorRetryPolicy.ts +++ b/sdk/core/core-http/src/policies/systemErrorRetryPolicy.ts @@ -2,7 +2,6 @@ // Licensed under the MIT license. import { HttpOperationResponse } from "../httpOperationResponse"; -import * as utils from "../util/utils"; import { WebResourceLike } from "../webResource"; import { BaseRequestPolicy, @@ -21,6 +20,7 @@ import { DEFAULT_CLIENT_MIN_RETRY_INTERVAL, isNumber } from "../util/exponentialBackoffStrategy"; +import { delay } from "../util/delay"; export function systemErrorRetryPolicy( retryCount?: number, @@ -107,7 +107,7 @@ async function retry( if (shouldRetry(policy.retryCount, shouldPolicyRetry, retryData, operationResponse, err)) { // If previous operation ended with an error and the policy allows a retry, do that try { - await utils.delay(retryData.retryInterval); + await delay(retryData.retryInterval); return policy._nextPolicy.sendRequest(request.clone()); } catch (nestedErr) { return retry(policy, request, operationResponse, nestedErr, retryData); diff --git a/sdk/core/core-http/src/policies/throttlingRetryPolicy.ts b/sdk/core/core-http/src/policies/throttlingRetryPolicy.ts index 6c02e59220f1..5d928a49a9b7 100644 --- a/sdk/core/core-http/src/policies/throttlingRetryPolicy.ts +++ b/sdk/core/core-http/src/policies/throttlingRetryPolicy.ts @@ -10,7 +10,8 @@ import { import { WebResourceLike } from "../webResource"; import { HttpOperationResponse } from "../httpOperationResponse"; import { Constants } from "../util/constants"; -import { delay } from "../util/utils"; +import { delay } from "../util/delay"; +import { AbortError } from "@azure/abort-controller"; type ResponseHandler = ( httpRequest: WebResourceLike, @@ -26,6 +27,8 @@ export function throttlingRetryPolicy(): RequestPolicyFactory { }; } +const StandardAbortMessage = "The operation was aborted."; + /** * To learn more, please refer to * https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-request-limits, @@ -67,7 +70,14 @@ export class ThrottlingRetryPolicy extends BaseRequestPolicy { retryAfterHeader ); if (delayInMs) { - return delay(delayInMs).then((_: any) => this._nextPolicy.sendRequest(httpRequest)); + await delay(delayInMs, undefined, { + abortSignal: httpRequest.abortSignal, + abortErrorMsg: StandardAbortMessage + }); + if (httpRequest.abortSignal?.aborted) { + throw new AbortError(StandardAbortMessage); + } + return this._nextPolicy.sendRequest(httpRequest); } } diff --git a/sdk/core/core-http/src/util/delay.ts b/sdk/core/core-http/src/util/delay.ts new file mode 100644 index 000000000000..211083a28f98 --- /dev/null +++ b/sdk/core/core-http/src/util/delay.ts @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { isDefined } from "./typeguards"; +import { AbortError, AbortSignalLike } from "@azure/abort-controller"; +const StandardAbortMessage = "The operation was aborted."; + +/** + * A wrapper for setTimeout that resolves a promise after delayInMs milliseconds. + * @param delayInMs - The number of milliseconds to be delayed. + * @param value - The value to be resolved with after a timeout of t milliseconds. + * @param options - The options for delay - currently abort options + * @param abortSignal - The abortSignal associated with containing operation. + * @param abortErrorMsg - The abort error message associated with containing operation. + * @returns - Resolved promise + */ +export function delay( + delayInMs: number, + value?: T, + options?: { + abortSignal?: AbortSignalLike; + abortErrorMsg?: string; + } +): Promise { + return new Promise((resolve, reject) => { + let timer: ReturnType | undefined = undefined; + let onAborted: (() => void) | undefined = undefined; + + const rejectOnAbort = (): void => { + return reject( + new AbortError(options?.abortErrorMsg ? options?.abortErrorMsg : StandardAbortMessage) + ); + }; + + const removeListeners = (): void => { + if (options?.abortSignal && onAborted) { + options.abortSignal.removeEventListener("abort", onAborted); + } + }; + + onAborted = (): void => { + if (isDefined(timer)) { + clearTimeout(timer); + } + removeListeners(); + return rejectOnAbort(); + }; + + if (options?.abortSignal && options.abortSignal.aborted) { + return rejectOnAbort(); + } + + timer = setTimeout(() => { + removeListeners(); + resolve(value); + }, delayInMs); + + if (options?.abortSignal) { + options.abortSignal.addEventListener("abort", onAborted); + } + }); +} diff --git a/sdk/core/core-http/src/util/typeguards.ts b/sdk/core/core-http/src/util/typeguards.ts new file mode 100644 index 000000000000..43a1b192c580 --- /dev/null +++ b/sdk/core/core-http/src/util/typeguards.ts @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * Helper TypeGuard that checks if the value is not null or undefined. + * @param thing - Anything + * @internal + */ +export function isDefined(thing: T | undefined | null): thing is T { + return typeof thing !== "undefined" && thing !== null; +} diff --git a/sdk/core/core-http/src/util/utils.ts b/sdk/core/core-http/src/util/utils.ts index 0cd289169223..2c2dd756f8a6 100644 --- a/sdk/core/core-http/src/util/utils.ts +++ b/sdk/core/core-http/src/util/utils.ts @@ -113,16 +113,6 @@ export function executePromisesSequentially( return result; } -/** - * A wrapper for setTimeout that resolves a promise after t milliseconds. - * @param t - The number of milliseconds to be delayed. - * @param value - The value to be resolved with after a timeout of t milliseconds. - * @returns Resolved promise - */ -export function delay(t: number, value?: T): Promise { - return new Promise((resolve) => setTimeout(() => resolve(value), t)); -} - /** * Service callback that is returned for REST requests initiated by the service client. */ diff --git a/sdk/core/core-http/test/policies/throttlingRetryPolicyTests.ts b/sdk/core/core-http/test/policies/throttlingRetryPolicyTests.ts index 5d663dfdd1d9..c14a873c6196 100644 --- a/sdk/core/core-http/test/policies/throttlingRetryPolicyTests.ts +++ b/sdk/core/core-http/test/policies/throttlingRetryPolicyTests.ts @@ -6,7 +6,8 @@ import sinon from "sinon"; import { ThrottlingRetryPolicy } from "../../src/policies/throttlingRetryPolicy"; import { WebResource } from "../../src/webResource"; import { HttpOperationResponse } from "../../src/httpOperationResponse"; -import { HttpHeaders, RequestPolicyOptions } from "../../src/coreHttp"; +import { Constants, HttpHeaders, RequestPolicyOptions } from "../../src/coreHttp"; +import { AbortController } from "@azure/abort-controller"; describe("ThrottlingRetryPolicy", () => { class PassThroughPolicy { @@ -112,6 +113,41 @@ describe("ThrottlingRetryPolicy", () => { delete (response.request as any).requestId; assert.deepEqual(response, mockResponse); }); + + it("should honor the abort signal passed", async () => { + const request = new WebResource( + "https://fakeservice.io", + "GET", + undefined, + undefined, + undefined, + undefined, + undefined, + AbortController.timeout(100) + ); + const mockResponse = { + headers: new HttpHeaders({ + "Retry-After": "10000" + }), + status: Constants.HttpConstants.StatusCodes.TooManyRequests, + body: { + type: "https://fakeservice.io/errors/too-many-requests", + title: "Resource utilization has surpassed the assigned quota", + policy: "Total Requests", + status: Constants.HttpConstants.StatusCodes.TooManyRequests + }, + request: request + }; + const policy = createDefaultThrottlingRetryPolicy(mockResponse); + let errorWasThrown = false; + try { + await policy.sendRequest(request); + } catch (error) { + errorWasThrown = true; + assert.equal((error as any).name, "AbortError", "Unexpected error thrown"); + } + assert.equal(errorWasThrown, true, "Error was not thrown"); + }); }); describe("parseRetryAfterHeader", () => {