diff --git a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts index 531080ff5d6e..bc5475fbdece 100644 --- a/sdk/eventhub/event-hubs/src/eventHubReceiver.ts +++ b/sdk/eventhub/event-hubs/src/eventHubReceiver.ts @@ -673,7 +673,7 @@ export class EventHubReceiver extends LinkEntity { const receivedEvents: ReceivedEventData[] = []; const retrieveEvents = (): Promise => { - return new Promise(async (resolve, reject) => { + return new Promise((resolve, reject) => { // if this consumer was closed, // resolve the operation's promise with the events collected thus far in case // the promise hasn't already been resolved. diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 60c4827ff4fc..f6aac4161abb 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -13,7 +13,7 @@ import { Message as RheaMessage } from "rhea-promise"; import { - Constants, + delay, ErrorNameConditionMapper, RetryConfig, RetryOperationType, @@ -29,10 +29,10 @@ import { EventHubProducerOptions } from "./models/private"; import { SendOptions } from "./models/public"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; -import { AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { AbortSignalLike } from "@azure/abort-controller"; import { EventDataBatch, isEventDataBatch } from "./eventDataBatch"; import { defaultDataTransformer } from "./dataTransformer"; - +import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils"; /** * Describes the EventHubSender that will send event data to EventHub. * @internal @@ -217,68 +217,54 @@ export class EventHubSender extends LinkEntity { abortSignal?: AbortSignalLike; } = {} ): Promise { - const abortSignal = options.abortSignal; - const retryOptions = options.retryOptions || {}; if (this.isOpen()) { return this._sender!.maxMessageSize; } - return new Promise(async (resolve, reject) => { - const rejectOnAbort = (): void => { - const desc: string = `[${this._context.connectionId}] The create batch operation has been cancelled by the user.`; - // Cancellation is user-intented, so treat as info instead of warning. - logger.info(desc); - const error = new AbortError(`The create batch operation has been cancelled by the user.`); - reject(error); - }; - - const onAbort = (): void => { - if (abortSignal) { - abortSignal.removeEventListener("abort", onAbort); - } - rejectOnAbort(); - }; - - if (abortSignal) { - // the aborter may have been triggered between request attempts - // so check if it was triggered and reject if needed. - if (abortSignal.aborted) { - return rejectOnAbort(); - } - abortSignal.addEventListener("abort", onAbort); - } + + const retryOptions = options.retryOptions || {}; + const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions); + retryOptions.timeoutInMs = timeoutInMs; + const senderOptions = this._createSenderOptions(timeoutInMs); + + const createLinkPromise = async (): Promise => { try { - logger.verbose( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); - const senderOptions = this._createSenderOptions(Constants.defaultOperationTimeoutInMs); - await defaultLock.acquire(this.senderLock, () => { - const config: RetryConfig = { - operation: () => this._init(senderOptions), - connectionId: this._context.connectionId, - operationType: RetryOperationType.senderLink, - abortSignal: abortSignal, - retryOptions: retryOptions - }; - - return retry(config); + await waitForTimeoutOrAbortOrResolve({ + actionFn: () => { + return defaultLock.acquire(this.senderLock, () => { + return this._init(senderOptions); + }); + }, + abortSignal: options?.abortSignal, + timeoutMs: timeoutInMs, + timeoutMessage: + `[${this._context.connectionId}] Sender "${this.name}" ` + + `with address "${this.address}", cannot be created right now, due ` + + `to operation timeout.` }); - resolve(this._sender!.maxMessageSize); } catch (err) { + const translatedError = translate(err); logger.warning( - "[%s] An error occurred while creating the sender %s", + "[%s] An error occurred while creating the sender %s: %s", this._context.connectionId, - this.name + this.name, + `${translatedError?.name}: ${translatedError?.message}` ); - logErrorStackTrace(err); - reject(err); - } finally { - if (abortSignal) { - abortSignal.removeEventListener("abort", onAbort); - } + logErrorStackTrace(translatedError); + throw translatedError; } - }); + }; + + const config: RetryConfig = { + operation: createLinkPromise, + connectionId: this._context.connectionId, + operationType: RetryOperationType.senderLink, + abortSignal: options?.abortSignal, + retryOptions: retryOptions + }; + + await retry(config); + + return this._sender!.maxMessageSize; } /** @@ -398,133 +384,115 @@ export class EventHubSender extends LinkEntity { const retryOptions = options.retryOptions || {}; const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions); retryOptions.timeoutInMs = timeoutInMs; - const sendEventPromise = (): Promise => - new Promise(async (resolve, reject) => { - const rejectOnAbort = (): void => { - const desc: string = - `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + - `address "${this.address}" has been cancelled by the user.`; - // Cancellation is user-intended, so log to info instead of warning. - logger.info(desc); - return reject(new AbortError("The send operation has been cancelled by the user.")); - }; - - if (abortSignal && abortSignal.aborted) { - // operation has been cancelled, so exit quickly - return rejectOnAbort(); - } - - const removeListeners = (): void => { - clearTimeout(waitTimer); // eslint-disable-line @typescript-eslint/no-use-before-define - if (abortSignal) { - abortSignal.removeEventListener("abort", onAborted); // eslint-disable-line @typescript-eslint/no-use-before-define - } - }; - - const onAborted = (): void => { - removeListeners(); - return rejectOnAbort(); - }; - - if (abortSignal) { - abortSignal.addEventListener("abort", onAborted); + const sendEventPromise = async (): Promise => { + const initStartTime = Date.now(); + if (!this.isOpen()) { + const senderOptions = this._createSenderOptions(timeoutInMs); + try { + await waitForTimeoutOrAbortOrResolve({ + actionFn: () => { + return defaultLock.acquire(this.senderLock, () => { + return this._init(senderOptions); + }); + }, + abortSignal: options?.abortSignal, + timeoutMs: timeoutInMs, + timeoutMessage: + `[${this._context.connectionId}] Sender "${this.name}" ` + + `with address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.` + }); + } catch (err) { + const translatedError = translate(err); + logger.warning( + "[%s] An error occurred while creating the sender %s: %s", + this._context.connectionId, + this.name, + `${translatedError?.name}: ${translatedError?.message}` + ); + logErrorStackTrace(translatedError); + throw translatedError; } + } + const timeTakenByInit = Date.now() - initStartTime; - const actionAfterTimeout = (): void => { - removeListeners(); - const desc: string = - `[${this._context.connectionId}] Sender "${this.name}" with ` + - `address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.`; - logger.warning(desc); - const e: Error = { - name: "OperationTimeoutError", - message: desc - }; - return reject(translate(e)); - }; + logger.verbose( + "[%s] Sender '%s', credit: %d available: %d", + this._context.connectionId, + this.name, + this._sender!.credit, + this._sender!.session.outgoing.available() + ); - const waitTimer = setTimeout(actionAfterTimeout, timeoutInMs); - const initStartTime = Date.now(); - if (!this.isOpen()) { - logger.verbose( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); + if (!this._sender!.sendable()) { + logger.verbose( + "%s Sender '%s', waiting for 1 second for sender to become sendable", + this._context.connectionId, + this.name + ); - try { - const senderOptions = this._createSenderOptions(timeoutInMs); - await defaultLock.acquire(this.senderLock, () => { - return this._init(senderOptions); - }); - } catch (err) { - removeListeners(); - const translatedError = translate(err); - logger.warning( - "[%s] An error occurred while creating the sender %s: %s", - this._context.connectionId, - this.name, - `${translatedError?.name}: ${translatedError?.message}` - ); - logErrorStackTrace(translatedError); - return reject(translatedError); - } - } - const timeTakenByInit = Date.now() - initStartTime; + await delay(1000); logger.verbose( - "[%s] Sender '%s', credit: %d available: %d", + "%s Sender '%s' after waiting for a second, credit: %d available: %d", this._context.connectionId, this.name, this._sender!.credit, - this._sender!.session.outgoing.available() + this._sender!.session?.outgoing?.available() ); - if (this._sender!.sendable()) { - logger.verbose( - "[%s] Sender '%s', sending message with id '%s'.", - this._context.connectionId, - this.name - ); - if (timeoutInMs <= timeTakenByInit) { - actionAfterTimeout(); - return; - } - try { - this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; - const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700); - logger.info( - "[%s] Sender '%s', sent message with delivery id: %d", - this._context.connectionId, - this.name, - delivery.id - ); - return resolve(); - } catch (err) { - const translatedError = translate(err.innerError || err); - logger.warning( - "[%s] An error occurred while sending the message %s", - this._context.connectionId, - `${translatedError?.name}: ${translatedError?.message}` - ); - logErrorStackTrace(translatedError); - return reject(translatedError); - } finally { - removeListeners(); - } - } else { - // let us retry to send the message after some time. - const msg = - `[${this._context.connectionId}] Sender "${this.name}", ` + - `cannot send the message right now. Please try later.`; - logger.warning(msg); - const amqpError: AmqpError = { - condition: ErrorNameConditionMapper.SenderBusyError, - description: msg - }; - reject(translate(amqpError)); - } - }); + } + + if (!this._sender!.sendable()) { + // let us retry to send the message after some time. + const msg = + `[${this._context.connectionId}] Sender "${this.name}", ` + + `cannot send the message right now. Please try later.`; + logger.warning(msg); + const amqpError: AmqpError = { + condition: ErrorNameConditionMapper.SenderBusyError, + description: msg + }; + throw translate(amqpError); + } + + logger.verbose( + "[%s] Sender '%s', sending message with id '%s'.", + this._context.connectionId, + this.name + ); + if (timeoutInMs <= timeTakenByInit) { + const desc: string = + `${this._context.connectionId} Sender "${this.name}" ` + + `with address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + logger.warning(desc); + const e: AmqpError = { + condition: ErrorNameConditionMapper.ServiceUnavailableError, + description: desc + }; + throw translate(e); + } + try { + this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; + const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700); + logger.info( + "[%s] Sender '%s', sent message with delivery id: %d", + this._context.connectionId, + this.name, + delivery.id + ); + } catch (error) { + const translatedError = translate(error); + logger.warning( + "%s] Sender '%s', An error occurred when sending the message: %s", + this._context.connectionId, + this.name, + `${translatedError?.name}: ${translatedError?.message}` + ); + logErrorStackTrace(translatedError); + throw translatedError; + } + }; const config: RetryConfig = { operation: sendEventPromise, diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index d16b7b8ec115..4a9511b64a2f 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -27,13 +27,14 @@ import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import { logErrorStackTrace, logger } from "./log"; import { getRetryAttemptTimeoutInMs } from "./util/retries"; -import { AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { AbortSignalLike } from "@azure/abort-controller"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; import { OperationNames } from "./models/private"; import { Span, SpanContext, SpanKind, CanonicalCode } from "@opentelemetry/api"; import { getParentSpan, OperationOptions } from "./util/operationOptions"; import { getTracer } from "@azure/core-tracing"; import { SharedKeyCredential } from "../src/eventhubSharedKeyCredential"; +import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils"; /** * Describes the runtime information of an Event Hub. @@ -389,97 +390,77 @@ export class ManagementClient extends LinkEntity { try { const abortSignal: AbortSignalLike | undefined = options && options.abortSignal; - const sendOperationPromise = (): Promise => - new Promise(async (resolve, reject) => { - let count = 0; - - const retryTimeoutInMs = getRetryAttemptTimeoutInMs(options.retryOptions); - let timeTakenByInit = 0; - - const rejectOnAbort = (): void => { - const requestName = options.requestName; - const desc: string = - `[${this._context.connectionId}] The request "${requestName}" ` + - `to has been cancelled by the user.`; - // Cancellation is user-intended behavior, so log to info instead of warning. - logger.info(desc); - const error = new AbortError( - `The ${requestName ? requestName + " " : ""}operation has been cancelled by the user.` - ); - - reject(error); - }; - - if (abortSignal) { - if (abortSignal.aborted) { - return rejectOnAbort(); - } - } + const sendOperationPromise = async (): Promise => { + let count = 0; - if (!this._isMgmtRequestResponseLinkOpen()) { - logger.verbose( - "[%s] Acquiring lock to get the management req res link.", - this._context.connectionId - ); + const retryTimeoutInMs = getRetryAttemptTimeoutInMs(options.retryOptions); + let timeTakenByInit = 0; - const initOperationStartTime = Date.now(); - - const actionAfterTimeout = (): void => { - const desc: string = `The request with message_id "${request.message_id}" timed out. Please try again later.`; - const e: Error = { - name: "OperationTimeoutError", - message: desc - }; - - return reject(translate(e)); - }; - - const waitTimer = setTimeout(actionAfterTimeout, retryTimeoutInMs); - - try { - await defaultLock.acquire(this.managementLock, () => { - return this._init(); - }); - } catch (err) { - return reject(translate(err)); - } finally { - clearTimeout(waitTimer); - } - timeTakenByInit = Date.now() - initOperationStartTime; - } + if (!this._isMgmtRequestResponseLinkOpen()) { + logger.verbose( + "[%s] Acquiring lock to get the management req res link.", + this._context.connectionId + ); - const remainingOperationTimeoutInMs = retryTimeoutInMs - timeTakenByInit; - - const sendRequestOptions: SendRequestOptions = { - abortSignal: options.abortSignal, - requestName: options.requestName, - timeoutInMs: remainingOperationTimeoutInMs - }; - - count++; - if (count !== 1) { - // Generate a new message_id every time after the first attempt - request.message_id = generate_uuid(); - } else if (!request.message_id) { - // Set the message_id in the first attempt only if it is not set - request.message_id = generate_uuid(); - } + const initOperationStartTime = Date.now(); try { - const result = await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions); - resolve(result); + await waitForTimeoutOrAbortOrResolve({ + actionFn: () => { + return defaultLock.acquire(this.managementLock, () => { + return this._init(); + }); + }, + abortSignal: options?.abortSignal, + timeoutMs: retryTimeoutInMs, + timeoutMessage: `The request with message_id "${request.message_id}" timed out. Please try again later.` + }); } catch (err) { const translatedError = translate(err); logger.warning( - "[%s] An error occurred during send on management request-response link with address '%s': %s", + "[%s] An error occurred while creating the management link %s: %s", this._context.connectionId, - this.address, + this.name, `${translatedError?.name}: ${translatedError?.message}` ); logErrorStackTrace(translatedError); - reject(translatedError); + throw translatedError; } - }); + timeTakenByInit = Date.now() - initOperationStartTime; + } + + const remainingOperationTimeoutInMs = retryTimeoutInMs - timeTakenByInit; + + const sendRequestOptions: SendRequestOptions = { + abortSignal: options.abortSignal, + requestName: options.requestName, + timeoutInMs: remainingOperationTimeoutInMs + }; + + count++; + if (count !== 1) { + // Generate a new message_id every time after the first attempt + request.message_id = generate_uuid(); + } else if (!request.message_id) { + // Set the message_id in the first attempt only if it is not set + request.message_id = generate_uuid(); + } + + try { + const result = await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions); + return result; + } catch (err) { + const translatedError = translate(err); + logger.warning( + "[%s] An error occurred during send on management request-response link with address '%s': %s", + this._context.connectionId, + this.address, + `${translatedError?.name}: ${translatedError?.message}` + ); + logErrorStackTrace(translatedError); + throw translatedError; + } + }; const config: RetryConfig = { operation: sendOperationPromise, diff --git a/sdk/eventhub/event-hubs/src/util/timeoutAbortSignalUtils.ts b/sdk/eventhub/event-hubs/src/util/timeoutAbortSignalUtils.ts new file mode 100644 index 000000000000..b3cb455c0b8b --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/timeoutAbortSignalUtils.ts @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { OperationTimeoutError } from "rhea-promise"; + +export const StandardAbortMessage = "The operation was aborted."; + +type setTimeoutArgs = (callback: (...args: any[]) => void, ms: number, ...args: any[]) => any; + +/** + * An executor for a function that returns a Promise that obeys both a timeout and an + * optional AbortSignal. + * @param timeoutMs - The number of milliseconds to allow before throwing an OperationTimeoutError. + * @param timeoutMessage - The message to place in the .description field for the thrown exception for Timeout. + * @param abortSignal - The abortSignal associated with containing operation. + * @param abortErrorMsg - The abort error message associated with containing operation. + * @param value - The value to be resolved with after a timeout of t milliseconds. + * + * @internal + */ +export async function waitForTimeoutOrAbortOrResolve(args: { + actionFn: () => Promise; + timeoutMs: number; + timeoutMessage: string; + abortSignal?: AbortSignalLike; + // these are optional and only here for testing. + timeoutFunctions?: { + setTimeoutFn: setTimeoutArgs; + clearTimeoutFn: (timeoutId: any) => void; + }; +}): Promise { + if (args.abortSignal && args.abortSignal.aborted) { + throw new AbortError(StandardAbortMessage); + } + + let timer: any | undefined = undefined; + let clearAbortSignal: (() => void) | undefined = undefined; + + const clearAbortSignalAndTimer = (): void => { + (args.timeoutFunctions?.clearTimeoutFn ?? clearTimeout)(timer); + + if (clearAbortSignal) { + clearAbortSignal(); + } + }; + + // eslint-disable-next-line promise/param-names + const abortOrTimeoutPromise = new Promise((_resolve, reject) => { + clearAbortSignal = checkAndRegisterWithAbortSignal(reject, args.abortSignal); + + timer = (args.timeoutFunctions?.setTimeoutFn ?? setTimeout)(() => { + reject(new OperationTimeoutError(args.timeoutMessage)); + }, args.timeoutMs); + }); + + try { + return await Promise.race([abortOrTimeoutPromise, args.actionFn()]); + } finally { + clearAbortSignalAndTimer(); + } +} + +export function checkAndRegisterWithAbortSignal( + onAbortFn: (abortError: AbortError) => void, + abortSignal?: AbortSignalLike +): () => void { + if (abortSignal == null) { + return () => { + /** Nothing to do here, no abort signal */ + }; + } + + if (abortSignal.aborted) { + throw new AbortError(StandardAbortMessage); + } + + const onAbort = (): void => { + abortSignal.removeEventListener("abort", onAbort); + onAbortFn(new AbortError(StandardAbortMessage)); + }; + + abortSignal.addEventListener("abort", onAbort); + + return () => abortSignal.removeEventListener("abort", onAbort); +} diff --git a/sdk/eventhub/event-hubs/test/internal/utils.spec.ts b/sdk/eventhub/event-hubs/test/internal/utils.spec.ts new file mode 100644 index 000000000000..28379e0e3f89 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/internal/utils.spec.ts @@ -0,0 +1,349 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { + checkAndRegisterWithAbortSignal, + waitForTimeoutOrAbortOrResolve, + StandardAbortMessage +} from "../../src/util/timeoutAbortSignalUtils"; +import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { delay } from "rhea-promise"; +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +chai.use(chaiAsPromised); +const assert = chai.assert; + +describe("utils", () => { + describe("waitForTimeoutAbortOrResolve", () => { + let abortController: AbortController; + let abortSignal: ReturnType; + let ourTimerId: NodeJS.Timer | undefined; + let timerWasCleared: boolean; + + let timeoutFunctions: { + setTimeoutFn: (callback: (...args: any[]) => void, ms: number, ...args: any[]) => any; + clearTimeoutFn: (timeoutId: NodeJS.Timer) => void; + }; + + const neverFireMs = 10 * 1000; + + beforeEach(() => { + abortController = new AbortController(); + abortSignal = getAbortSignalWithTracking(abortController); + ourTimerId = undefined; + timerWasCleared = false; + + const setTimeoutFn = ( + callback: (...args: any[]) => void, + ms: number, + ...args: any[] + ): any => { + const id = setTimeout(callback, ms, ...args); + + assert.notExists( + ourTimerId, + "Definitely shouldn't schedule our timeout callback more than once" + ); + + ourTimerId = id; + return id; + }; + + const clearTimeoutFn = (timerIdToClear: NodeJS.Timer): any => { + assert.exists(timerIdToClear); + assert.isFalse(timerWasCleared, "Timer should not be cleared multiple times"); + timerWasCleared = true; + + return clearTimeout(timerIdToClear); + }; + + timeoutFunctions = { + setTimeoutFn, + clearTimeoutFn + }; + }); + + it("abortSignal cancelled in mid-flight", async () => { + const prm = waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(neverFireMs); + }, + timeoutMessage: "the message for the timeout", + timeoutMs: neverFireMs, + abortSignal, + timeoutFunctions + }); + + await delay(500); + abortController.abort(); + + try { + await prm; + assert.fail("Should have thrown an AbortError"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); + } + + assert.isTrue( + abortSignal.ourListenersWereRemoved(), + "All paths should properly clean up any event listeners on the signal" + ); + assert.isTrue(timerWasCleared); + }); + + it("abortSignal already aborted", async () => { + abortController.abort(); + + try { + await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(neverFireMs); + }, + timeoutMessage: "the message for the timeout", + timeoutMs: neverFireMs, + abortSignal, + timeoutFunctions + }); + + assert.fail("Should have thrown an AbortError"); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + assert.equal(err.name, "AbortError"); + } + + assert.isTrue( + abortSignal.ourListenersWereRemoved(), + "All paths should properly clean up any event listeners on the signal" + ); + // the abort signal is checked early, so the timeout never gets set up here. + assert.notExists(ourTimerId); + }); + + it("abortSignal is optional", async () => { + try { + await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(neverFireMs); + }, + timeoutMs: 500, + timeoutMessage: "the message for the timeout", + timeoutFunctions + }); + + assert.fail("Should have thrown an TimeoutError"); + } catch (err) { + assert.equal(err.message, "the message for the timeout"); + assert.equal(err.name, "OperationTimeoutError"); + } + + assert.isTrue(timerWasCleared); + }); + + it("timeout expires", async () => { + try { + await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(neverFireMs); + }, + timeoutMessage: "the message for the timeout", + timeoutMs: 500, + abortSignal, + timeoutFunctions + }); + + assert.fail("Should have thrown an TimeoutError"); + } catch (err) { + assert.equal(err.message, "the message for the timeout"); + assert.equal(err.name, "OperationTimeoutError"); + } + + assert.isTrue( + abortSignal.ourListenersWereRemoved(), + "All paths should properly clean up any event listeners on the signal" + ); + assert.isTrue(timerWasCleared); + }); + + it("nothing expires", async () => { + const result = await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(500); + return 100; + }, + timeoutMessage: "the message for the timeout", + timeoutMs: neverFireMs, + abortSignal, + timeoutFunctions + }); + + assert.equal(result, 100); + assert.isTrue( + abortSignal.ourListenersWereRemoved(), + "All paths should properly clean up any event listeners on the signal" + ); + assert.isTrue(timerWasCleared); + }); + + it("actionFn throws an error", async () => { + try { + await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + throw new Error("Error thrown from action"); + }, + timeoutMessage: "the message for the timeout", + timeoutMs: neverFireMs, + abortSignal, + timeoutFunctions + }); + + assert.fail("Should have thrown"); + } catch (err) { + assert.equal(err.message, "Error thrown from action"); + } + + assert.isTrue( + abortSignal.ourListenersWereRemoved(), + "All paths should properly clean up any event listeners on the signal" + ); + assert.isTrue(timerWasCleared); + }); + + it("sanity check - the real timeout methods do get used if we don't provide fake ones", async () => { + try { + await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(5000); + }, + timeoutMessage: "the message for the timeout", + timeoutMs: 1, + abortSignal + }); + } catch (err) { + assert.equal(err.message, "the message for the timeout"); + } + + try { + abortController.abort(); + + await waitForTimeoutOrAbortOrResolve({ + actionFn: async () => { + await delay(5000); + }, + timeoutMessage: "the message for the timeout", + timeoutMs: neverFireMs, + abortSignal + }); + } catch (err) { + assert.equal(err.message, StandardAbortMessage); + } + }); + }); + + describe("checkAndRegisterWithAbortSignal", () => { + let abortController: AbortController; + let abortSignal: ReturnType; + + beforeEach(() => { + abortController = new AbortController(); + abortSignal = getAbortSignalWithTracking(abortController); + }); + + it("abortSignal is undefined", () => { + const cleanupFn = checkAndRegisterWithAbortSignal(() => { + throw new Error("Will never be called"); + }, undefined); + + // we just return a no-op function in this case. + assert.exists(cleanupFn); + cleanupFn(); + }); + + it("abortSignal is already aborted", () => { + abortController.abort(); + + try { + checkAndRegisterWithAbortSignal(() => { + throw new Error("Will never be called"); + }, abortSignal); + assert.fail("Should have thrown an AbortError"); + } catch (err) { + assert.equal(err.name, "AbortError"); + assert.equal(err.message, StandardAbortMessage); + } + + assert.isTrue(abortSignal.ourListenersWereRemoved()); + }); + + it("abortSignal abort calls handlers", async () => { + let callbackWasCalled = false; + const cleanupFn = checkAndRegisterWithAbortSignal((abortError: AbortError) => { + callbackWasCalled = true; + assert.equal(abortError.message, StandardAbortMessage); + }, abortSignal); + assert.exists(cleanupFn); + + assert.isFalse(abortSignal.ourListenersWereRemoved()); + assert.isFalse(callbackWasCalled); + + abortController.abort(); + await delay(0); + + assert.isTrue(abortSignal.ourListenersWereRemoved()); + assert.isTrue(callbackWasCalled); + + // and cleanupFn is harmless to call here. + cleanupFn!(); + }); + + it("calling cleanup removes handlers from abortSignal", async () => { + let callbackWasCalled = false; + const cleanupFn = checkAndRegisterWithAbortSignal(() => { + callbackWasCalled = true; + }, abortSignal); + assert.exists(cleanupFn); + + assert.isFalse(abortSignal.ourListenersWereRemoved()); + assert.isFalse(callbackWasCalled); + + if (cleanupFn == null) { + throw new Error("No cleanup function!"); + } + + cleanupFn(); + + assert.isTrue(abortSignal.ourListenersWereRemoved()); + // sanity check - let's make sure we're not accidentally triggering their abort handler! + assert.isFalse(callbackWasCalled); + }); + }); +}); + +function getAbortSignalWithTracking( + abortController: AbortController +): AbortSignalLike & { ourListenersWereRemoved(): boolean } { + const signal = (abortController.signal as any) as ReturnType; + + const allFunctions = new Set(); + + const origAddEventListener = signal.addEventListener; + const origRemoveEventListener = signal.removeEventListener; + + signal.addEventListener = (name, handler) => { + assert.isFalse(allFunctions.has(handler), "Handler should not have already been added"); + allFunctions.add(handler); + origAddEventListener.call(signal, name, handler); + }; + + signal.removeEventListener = (name, handler) => { + // being less stringent about potentially removing it more than once since it simplifies + // our error handling code. + allFunctions.delete(handler); + origRemoveEventListener.call(signal, name, handler); + }; + + signal.ourListenersWereRemoved = () => { + return allFunctions.size === 0; + }; + return signal; +} diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index f46588c551cf..89eda880e220 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -67,7 +67,11 @@ export class MessageSender extends LinkEntity { private _onSessionClose: OnAmqpEvent; private _retryOptions: RetryOptions; - constructor(connectionContext: ConnectionContext, entityPath: string, retryOptions: RetryOptions) { + constructor( + connectionContext: ConnectionContext, + entityPath: string, + retryOptions: RetryOptions + ) { super(entityPath, entityPath, connectionContext, "sender", logger, { address: entityPath, audience: `${connectionContext.config.endpoint}${entityPath}` @@ -170,112 +174,104 @@ export class MessageSender extends LinkEntity { ? Constants.defaultOperationTimeoutInMs : this._retryOptions.timeoutInMs; - const sendEventPromise = () => - new Promise(async (resolve, reject) => { - const initStartTime = Date.now(); - if (!this.isOpen()) { - try { - await waitForTimeoutOrAbortOrResolve({ - actionFn: () => this.open(undefined, options?.abortSignal), - abortSignal: options?.abortSignal, - timeoutMs: timeoutInMs, - timeoutMessage: - `[${this._context.connectionId}] Sender "${this.name}" ` + - `with address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.` - }); - } catch (err) { - err = translateServiceBusError(err); - logger.logError( - err, - "%s An error occurred while creating the sender", - this.logPrefix, - this.name - ); - return reject(err); - } - } - + const sendEventPromise = async (): Promise => { + const initStartTime = Date.now(); + if (!this.isOpen()) { try { - const timeTakenByInit = Date.now() - initStartTime; - - logger.verbose( - "%s Sender '%s', credit: %d available: %d", + await waitForTimeoutOrAbortOrResolve({ + actionFn: () => this.open(undefined, options?.abortSignal), + abortSignal: options?.abortSignal, + timeoutMs: timeoutInMs, + timeoutMessage: + `[${this._context.connectionId}] Sender "${this.name}" ` + + `with address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.` + }); + } catch (err) { + err = translateServiceBusError(err); + logger.logError( + err, + "%s An error occurred while creating the sender", this.logPrefix, - this.name, - this.link?.credit, - this.link?.session?.outgoing?.available() + this.name ); - - if (!this.link?.sendable()) { - logger.verbose( - "%s Sender '%s', waiting for 1 second for sender to become sendable", - this.logPrefix, - this.name - ); - - await delay(1000); - - logger.verbose( - "%s Sender '%s' after waiting for a second, credit: %d available: %d", - this.logPrefix, - this.name, - this.link?.credit, - this.link?.session?.outgoing?.available() - ); - } - if (this.link?.sendable()) { - if (timeoutInMs <= timeTakenByInit) { - const desc: string = - `${this.logPrefix} Sender "${this.name}" ` + - `with address "${this.address}", was not able to send the message right now, due ` + - `to operation timeout.`; - logger.warning(desc); - const e: AmqpError = { - condition: ErrorNameConditionMapper.ServiceUnavailableError, - description: desc - }; - return reject(translateServiceBusError(e)); - } - try { - this.link.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; - const delivery = await this.link!.send( - encodedMessage, - undefined, - sendBatch ? 0x80013700 : 0 - ); - logger.verbose( - "%s Sender '%s', sent message with delivery id: %d", - this.logPrefix, - this.name, - delivery.id - ); - return resolve(); - } catch (error) { - error = translateServiceBusError(error.innerError || error); - logger.logError( - error, - `${this.logPrefix} An error occurred while sending the message` - ); - return reject(error); - } - } else { - // let us retry to send the message after some time. - const msg = - `[${this.logPrefix}] Sender "${this.name}", ` + - `cannot send the message right now. Please try later.`; - logger.warning(msg); - const amqpError: AmqpError = { - condition: ErrorNameConditionMapper.SenderBusyError, - description: msg - }; - reject(translateServiceBusError(amqpError)); - } - } catch (err) { - reject(err); + throw err; } - }); + } + + const timeTakenByInit = Date.now() - initStartTime; + + logger.verbose( + "%s Sender '%s', credit: %d available: %d", + this.logPrefix, + this.name, + this.link?.credit, + this.link?.session?.outgoing?.available() + ); + + if (!this.link?.sendable()) { + logger.verbose( + "%s Sender '%s', waiting for 1 second for sender to become sendable", + this.logPrefix, + this.name + ); + + await delay(1000); + + logger.verbose( + "%s Sender '%s' after waiting for a second, credit: %d available: %d", + this.logPrefix, + this.name, + this.link?.credit, + this.link?.session?.outgoing?.available() + ); + } + + if (!this.link?.sendable()) { + // let us retry to send the message after some time. + const msg = + `[${this.logPrefix}] Sender "${this.name}", ` + + `cannot send the message right now. Please try later.`; + logger.warning(msg); + const amqpError: AmqpError = { + condition: ErrorNameConditionMapper.SenderBusyError, + description: msg + }; + throw translateServiceBusError(amqpError); + } + + if (timeoutInMs <= timeTakenByInit) { + const desc: string = + `${this.logPrefix} Sender "${this.name}" ` + + `with address "${this.address}", was not able to send the message right now, due ` + + `to operation timeout.`; + logger.warning(desc); + const e: AmqpError = { + condition: ErrorNameConditionMapper.ServiceUnavailableError, + description: desc + }; + throw translateServiceBusError(e); + } + try { + this.link.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000; + const delivery = await this.link!.send( + encodedMessage, + undefined, + sendBatch ? 0x80013700 : 0 + ); + logger.verbose( + "%s Sender '%s', sent message with delivery id: %d", + this.logPrefix, + this.name, + delivery.id + ); + } catch (error) { + error = translateServiceBusError(error.innerError || error); + logger.logError(error, `${this.logPrefix} An error occurred while sending the message`); + throw error; + } + }; const config: RetryConfig = { operation: sendEventPromise, connectionId: this._context.connectionId!, @@ -481,23 +477,18 @@ export class MessageSender extends LinkEntity { if (this.isOpen()) { return this.link!.maxMessageSize; } - return new Promise(async (resolve, reject) => { - try { - const config: RetryConfig = { - operation: () => this.open(undefined, options?.abortSignal), - connectionId: this._context.connectionId, - operationType: RetryOperationType.senderLink, - retryOptions: retryOptions, - abortSignal: options?.abortSignal - }; - await retry(config); + const config: RetryConfig = { + operation: () => this.open(undefined, options?.abortSignal), + connectionId: this._context.connectionId, + operationType: RetryOperationType.senderLink, + retryOptions: retryOptions, + abortSignal: options?.abortSignal + }; - return resolve(this.link!.maxMessageSize); - } catch (err) { - reject(err); - } - }); + await retry(config); + + return this.link!.maxMessageSize; } async createBatch(options?: CreateMessageBatchOptions): Promise { diff --git a/sdk/servicebus/service-bus/src/util/utils.ts b/sdk/servicebus/service-bus/src/util/utils.ts index 20597a2d90af..9c1968ce1ffe 100644 --- a/sdk/servicebus/service-bus/src/util/utils.ts +++ b/sdk/servicebus/service-bus/src/util/utils.ts @@ -535,6 +535,8 @@ export type EntityAvailabilityStatus = */ export const StandardAbortMessage = "The operation was aborted."; +type setTimeoutArgs = (callback: (...args: any[]) => void, ms: number, ...args: any[]) => any; + /** * An executor for a function that returns a Promise that obeys both a timeout and an * optional AbortSignal. @@ -553,7 +555,7 @@ export async function waitForTimeoutOrAbortOrResolve(args: { abortSignal?: AbortSignalLike; // these are optional and only here for testing. timeoutFunctions?: { - setTimeoutFn: (callback: (...args: any[]) => void, ms: number, ...args: any[]) => any; + setTimeoutFn: setTimeoutArgs; clearTimeoutFn: (timeoutId: any) => void; }; }): Promise { @@ -604,7 +606,9 @@ export function checkAndRegisterWithAbortSignal( abortSignal?: AbortSignalLike ): () => void { if (abortSignal == null) { - return () => {}; + return () => { + /** Nothing to do here, no abort signal */ + }; } if (abortSignal.aborted) {