Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[event-hubs] fixes sendBatch race condition causing TypeError to be thrown #15021

Merged
merged 5 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 5.5.1 (Unreleased)
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved

- Fixes issue [#14606](https://github.com/Azure/azure-sdk-for-js/issues/14606) where the `EventHubConsumerClient` could call subscribe's `processError` callback with a "Too much pending tasks" error. This could occur if the consumer was unable to connect to the service for an extended period of time.

- Fixes issue [#15002](https://github.com/Azure/azure-sdk-for-js/issues/15002) where in rare cases an unexpected `TypeError` could be thrown from `EventHubProducerClient.sendBatch` when the connection was disconnected while sending events was in progress.

## 5.5.0 (2021-04-06)

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ export class EventHubReceiver extends LinkEntity {
// store the underlying link in a cache
this._context.receivers[this.name] = this;

await this._ensureTokenRenewal();
this._ensureTokenRenewal();
} else {
logger.verbose(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
Expand Down
68 changes: 32 additions & 36 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class EventHubSender extends LinkEntity {
* @returns boolean
*/
isOpen(): boolean {
const result: boolean = this._sender! && this._sender!.isOpen();
const result = Boolean(this._sender && this._sender.isOpen());
logger.verbose(
"[%s] Sender '%s' with address '%s' is open? -> %s",
this._context.connectionId,
Expand All @@ -216,9 +216,9 @@ export class EventHubSender extends LinkEntity {
abortSignal?: AbortSignalLike;
} = {}
): Promise<number> {
await this._createLinkIfNotOpen(options);
const sender = await this._getLink(options);

return this._sender!.maxMessageSize;
return sender.maxMessageSize;
}

/**
Expand Down Expand Up @@ -339,21 +339,20 @@ export class EventHubSender extends LinkEntity {
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;

const initStartTime = Date.now();
await this._createLinkIfNotOpen(options);
const timeTakenByInit = Date.now() - initStartTime;

const sendEventPromise = async (): Promise<void> => {
const initStartTime = Date.now();
const sender = await this._getLink(options);
const timeTakenByInit = Date.now() - initStartTime;
logger.verbose(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
sender.credit,
sender.session.outgoing.available()
);

let waitTimeForSendable = 1000;
if (!this._sender!.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
if (!sender.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
logger.verbose(
"%s Sender '%s', waiting for 1 second for sender to become sendable",
this._context.connectionId,
Expand All @@ -366,14 +365,14 @@ export class EventHubSender extends LinkEntity {
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session?.outgoing?.available()
sender.credit,
sender.session?.outgoing?.available()
);
} else {
waitTimeForSendable = 0;
}

if (!this._sender!.sendable()) {
if (!sender.sendable()) {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
Expand Down Expand Up @@ -404,10 +403,9 @@ export class EventHubSender extends LinkEntity {
throw translate(e);
}

this._sender!.sendTimeoutInSeconds =
(timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
sender.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
try {
const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700, {
const delivery = await sender.send(rheaMessage, undefined, 0x80013700, {
abortSignal
});
logger.info(
Expand Down Expand Up @@ -444,22 +442,22 @@ export class EventHubSender extends LinkEntity {
}
}

private async _createLinkIfNotOpen(
private async _getLink(
options: {
retryOptions?: RetryOptions;
abortSignal?: AbortSignalLike;
} = {}
): Promise<void> {
if (this.isOpen()) {
return;
): Promise<AwaitableSender> {
if (this.isOpen() && this._sender) {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
return this._sender;
}
const retryOptions = options.retryOptions || {};
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;
const senderOptions = this._createSenderOptions(timeoutInMs);

const startTime = Date.now();
const createLinkPromise = async (): Promise<void> => {
const createLinkPromise = async (): Promise<AwaitableSender> => {
return defaultCancellableLock.acquire(
this.senderLock,
() => {
Expand All @@ -475,7 +473,7 @@ export class EventHubSender extends LinkEntity {
);
};

const config: RetryConfig<void> = {
const config: RetryConfig<AwaitableSender> = {
operation: createLinkPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
Expand All @@ -484,7 +482,7 @@ export class EventHubSender extends LinkEntity {
};

try {
await retry<void>(config);
return await retry<AwaitableSender>(config);
} catch (err) {
const translatedError = translate(err);
logger.warning(
Expand All @@ -500,18 +498,17 @@ export class EventHubSender extends LinkEntity {

/**
* Initializes the sender session on the connection.
* Should only be called from _createLinkIfNotOpen
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
* @hidden
*/
private async _init(
options: AwaitableSenderOptions & {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}
): Promise<void> {
): Promise<AwaitableSender> {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

if (!this.isOpen() || !this._sender) {
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim({
Expand All @@ -526,33 +523,32 @@ export class EventHubSender extends LinkEntity {
this.name
);

this._sender = await this._context.connection.createAwaitableSender(options);
this.isConnecting = false;
const sender = await this._context.connection.createAwaitableSender(options);
this._sender = sender;
logger.verbose(
"[%s] Sender '%s' created with sender options: %O",
this._context.connectionId,
this.name,
options
);
this._sender.setMaxListeners(1000);
sender.setMaxListeners(1000);

// It is possible for someone to close the sender and then start it again.
// Thus make sure that the sender is present in the client cache.
if (!this._context.senders[this.name]) this._context.senders[this.name] = this;
await this._ensureTokenRenewal();
this._ensureTokenRenewal();
return sender;
} else {
logger.verbose(
"[%s] The sender '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
"[%s] The sender '%s' with address '%s' is open -> %s. Hence not reconnecting.",
this._context.connectionId,
this.name,
this.address,
this.isOpen(),
this.isConnecting
this.isOpen()
);
return this._sender;
}
} catch (err) {
this.isConnecting = false;
const translatedError = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s: %s",
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ export class LinkEntity {
* Ensures that the token is renewed within the predefined renewal margin.
* @hidden
*/
protected async _ensureTokenRenewal(): Promise<void> {
protected _ensureTokenRenewal(): void {
if (!this._tokenTimeoutInMs) {
return;
}
Expand Down
65 changes: 65 additions & 0 deletions sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import chai from "chai";
const should = chai.should();
import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import { EnvVarKeys, getEnvVars } from "../../public/utils/testUtils";
import { EventHubSender } from "../../../src/eventHubSender";
import { createConnectionContext } from "../../../src/connectionContext";
import { stub } from "sinon";
import { MessagingError } from "@azure/core-amqp";
const env = getEnvVars();

describe("disconnected", function() {
const service = {
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
path: env[EnvVarKeys.EVENTHUB_NAME]
};
before("validate environment", function(): void {
should.exist(
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests."
);
should.exist(
env[EnvVarKeys.EVENTHUB_NAME],
"define EVENTHUB_NAME in your environment before running integration tests."
);
});

describe("EventHubSender", function() {
/**
* Test added for issue https://github.com/Azure/azure-sdk-for-js/issues/15002
* Prior to fixing this issue, a TypeError would be thrown when this test was ran.
*/
it("send works after disconnect", async () => {
const context = createConnectionContext(service.connectionString, service.path);
const sender = EventHubSender.create(context);

// Create the sender link via getMaxMessageSize() so we can check when 'send' is about to be called on it.
await sender.getMaxMessageSize();
should.equal(sender.isOpen(), true, "Expected sender to be open.");

// Here we stub out the 'send' call on the AwaitableSender.
// We do 2 things:
// 1. Call `idle()` on the underlying rhea connection so that a disconnect is triggered.
// 2. Reject with a MessagingError.
// The MessagingError is thrown so that the send operation will be retried.
// The disconnect that's triggered will cause the existing AwaitableSender to be closed.

// If everything works as expected, then a new AwaitableSender should be created on the next
// retry attempt and the event should be successfully sent.
const senderLink = sender["_sender"]!;
const sendStub = stub(senderLink, "send");
sendStub.callsFake(async () => {
context.connection["_connection"].idle();
throw new MessagingError("Fake rejection!");
});

await sender.send([{ body: "foo" }]);

await context.close();
});
});
});