Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Promise executor functions should not be async #13826

Merged
merged 5 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ export class EventHubReceiver extends LinkEntity {
const receivedEvents: ReceivedEventData[] = [];

const retrieveEvents = (): Promise<ReceivedEventData[]> => {
return new Promise(async (resolve, reject) => {
return new Promise((resolve, reject) => {
// if this consumer was closed,
// resolve the operation's promise with the events collected thus far in case
// the promise hasn't already been resolved.
Expand Down
328 changes: 145 additions & 183 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
Message as RheaMessage
} from "rhea-promise";
import {
Constants,
delay,
ErrorNameConditionMapper,
RetryConfig,
RetryOperationType,
Expand All @@ -29,10 +29,10 @@ import { EventHubProducerOptions } from "./models/private";
import { SendOptions } from "./models/public";

import { getRetryAttemptTimeoutInMs } from "./util/retries";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { AbortSignalLike } from "@azure/abort-controller";
import { EventDataBatch, isEventDataBatch } from "./eventDataBatch";
import { defaultDataTransformer } from "./dataTransformer";

import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils";
/**
* Describes the EventHubSender that will send event data to EventHub.
* @internal
Expand Down Expand Up @@ -217,68 +217,9 @@ export class EventHubSender extends LinkEntity {
abortSignal?: AbortSignalLike;
} = {}
): Promise<number> {
const abortSignal = options.abortSignal;
const retryOptions = options.retryOptions || {};
if (this.isOpen()) {
return this._sender!.maxMessageSize;
}
return new Promise<number>(async (resolve, reject) => {
const rejectOnAbort = (): void => {
const desc: string = `[${this._context.connectionId}] The create batch operation has been cancelled by the user.`;
// Cancellation is user-intented, so treat as info instead of warning.
logger.info(desc);
const error = new AbortError(`The create batch operation has been cancelled by the user.`);
reject(error);
};

const onAbort = (): void => {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
rejectOnAbort();
};

if (abortSignal) {
// the aborter may have been triggered between request attempts
// so check if it was triggered and reject if needed.
if (abortSignal.aborted) {
return rejectOnAbort();
}
abortSignal.addEventListener("abort", onAbort);
}
try {
logger.verbose(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
const senderOptions = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
await defaultLock.acquire(this.senderLock, () => {
const config: RetryConfig<void> = {
operation: () => this._init(senderOptions),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
abortSignal: abortSignal,
retryOptions: retryOptions
};

return retry<void>(config);
});
resolve(this._sender!.maxMessageSize);
} catch (err) {
logger.warning(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name
);
logErrorStackTrace(err);
reject(err);
} finally {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
}
});
await this._createLinkIfNotOpen(options);

return this._sender!.maxMessageSize;
}

/**
Expand Down Expand Up @@ -390,141 +331,96 @@ export class EventHubSender extends LinkEntity {
* @param rheaMessage - The message to be sent to EventHub.
* @returns Promise<void>
*/
private _trySendBatch(
private async _trySendBatch(
rheaMessage: RheaMessage | Buffer,
options: SendOptions & EventHubProducerOptions = {}
): Promise<void> {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const retryOptions = options.retryOptions || {};
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;
const sendEventPromise = (): Promise<void> =>
new Promise<void>(async (resolve, reject) => {
const rejectOnAbort = (): void => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
// Cancellation is user-intended, so log to info instead of warning.
logger.info(desc);
return reject(new AbortError("The send operation has been cancelled by the user."));
};

if (abortSignal && abortSignal.aborted) {
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}
const initStartTime = Date.now();
await this._createLinkIfNotOpen(options);
const timeTakenByInit = Date.now() - initStartTime;

const removeListeners = (): void => {
clearTimeout(waitTimer); // eslint-disable-line @typescript-eslint/no-use-before-define
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted); // eslint-disable-line @typescript-eslint/no-use-before-define
}
};
const sendEventPromise = async (): Promise<void> => {
logger.verbose(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
);

const onAborted = (): void => {
removeListeners();
return rejectOnAbort();
};
let waitTimeForSendable = 1000;
if (!this._sender!.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
logger.verbose(
"%s Sender '%s', waiting for 1 second for sender to become sendable",
this._context.connectionId,
this.name
);

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}
await delay(waitTimeForSendable);

const actionAfterTimeout = (): void => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
logger.warning(desc);
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
return reject(translate(e));
};
logger.verbose(
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session?.outgoing?.available()
);
} else {
waitTimeForSendable = 0;
}

const waitTimer = setTimeout(actionAfterTimeout, timeoutInMs);
const initStartTime = Date.now();
if (!this.isOpen()) {
logger.verbose(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
if (!this._sender!.sendable()) {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
`cannot send the message right now. Please try later.`;
logger.warning(msg);
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg
};
throw translate(amqpError);
}

try {
const senderOptions = this._createSenderOptions(timeoutInMs);
await defaultLock.acquire(this.senderLock, () => {
return this._init(senderOptions);
});
} catch (err) {
removeListeners();
const translatedError = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s: %s",
this._context.connectionId,
this.name,
`${translatedError?.name}: ${translatedError?.message}`
);
logErrorStackTrace(translatedError);
return reject(translatedError);
}
}
const timeTakenByInit = Date.now() - initStartTime;
logger.verbose(
"[%s] Sender '%s', sending message with id '%s'.",
this._context.connectionId,
this.name
);
if (timeoutInMs <= timeTakenByInit + waitTimeForSendable) {
const desc: string =
`${this._context.connectionId} Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
logger.warning(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
throw translate(e);
}

logger.verbose(
"[%s] Sender '%s', credit: %d available: %d",
this._sender!.sendTimeoutInSeconds =
(timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
try {
const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700, {
abortSignal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well sure, tackle #13504 for event hubs while you're at it 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I HAD to do it otherwise the test failed!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, what other tests can I suggest then...

});
logger.info(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
delivery.id
);
if (this._sender!.sendable()) {
logger.verbose(
"[%s] Sender '%s', sending message with id '%s'.",
this._context.connectionId,
this.name
);
if (timeoutInMs <= timeTakenByInit) {
actionAfterTimeout();
return;
}
try {
this._sender!.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700);
logger.info(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
delivery.id
);
return resolve();
} catch (err) {
const translatedError = translate(err.innerError || err);
logger.warning(
"[%s] An error occurred while sending the message %s",
this._context.connectionId,
`${translatedError?.name}: ${translatedError?.message}`
);
logErrorStackTrace(translatedError);
return reject(translatedError);
} finally {
removeListeners();
}
} else {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
`cannot send the message right now. Please try later.`;
logger.warning(msg);
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SenderBusyError,
description: msg
};
reject(translate(amqpError));
}
});
} catch (err) {
throw err.innerError || err;
}
};

const config: RetryConfig<void> = {
operation: sendEventPromise,
Expand All @@ -533,7 +429,73 @@ export class EventHubSender extends LinkEntity {
abortSignal: abortSignal,
retryOptions: retryOptions
};
return retry<void>(config);

try {
await retry<void>(config);
} catch (err) {
const translatedError = translate(err);
logger.warning(
"[%s] Sender '%s', An error occurred while sending the message %s",
this._context.connectionId,
this.name,
`${translatedError?.name}: ${translatedError?.message}`
);
logErrorStackTrace(translatedError);
throw translatedError;
}
}

private async _createLinkIfNotOpen(
options: {
retryOptions?: RetryOptions;
abortSignal?: AbortSignalLike;
} = {}
): Promise<void> {
if (this.isOpen()) {
return;
}
const retryOptions = options.retryOptions || {};
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;
const senderOptions = this._createSenderOptions(timeoutInMs);

const createLinkPromise = async (): Promise<void> => {
return waitForTimeoutOrAbortOrResolve({
actionFn: () => {
return defaultLock.acquire(this.senderLock, () => {
return this._init(senderOptions);
});
},
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
`[${this._context.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", cannot be created right now, due ` +
`to operation timeout.`
});
};

const config: RetryConfig<void> = {
operation: createLinkPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
abortSignal: options.abortSignal,
retryOptions: retryOptions
};

try {
await retry<void>(config);
} catch (err) {
const translatedError = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s: %s",
this._context.connectionId,
this.name,
`${translatedError?.name}: ${translatedError?.message}`
);
logErrorStackTrace(translatedError);
throw translatedError;
}
}

/**
Expand Down
Loading