Skip to content

Commit

Permalink
[service-bus] Fixing issue where links were not removed from our inte…
Browse files Browse the repository at this point in the history
…rnal cache (#15929)

Today we cache any opened links in the connectionContext. These links should be removed when the link itself is closed but, due to a mismatch in the values, we weren't. 

I've fixed this by just making an abstract method in LinkEntity (the base for all the link types) and just having each link properly remove itself from the cache.

Fixes #15890
  • Loading branch information
richardpark-msft authored Jun 24, 2021
1 parent 46078e3 commit 58b1993
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 16 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

### Fixed

- Fixing an issue where the internal link cache would not properly remove closed links.
[PR#15929](https://github.com/Azure/azure-sdk-for-js/pull/15929)

## 7.2.0 (2021-06-10)

Expand Down
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ export class BatchingReceiver extends MessageReceiver {
context.messageReceivers[bReceiver.name] = bReceiver;
return bReceiver;
}

protected removeLinkFromContext(): void {
delete this._context.messageReceivers[this.name];
}
}

/**
Expand Down
22 changes: 6 additions & 16 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,22 +299,7 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ

this._logger.verbose(`${this.logPrefix} permanently closing this link.`);

// Remove the underlying AMQP link from the cache
switch (this._linkType) {
case "s": {
delete this._context.senders[this.name];
break;
}
case "br":
case "sr": {
delete this._context.messageReceivers[this.name];
break;
}
case "ms": {
delete this._context.messageSessions[this.name];
break;
}
}
this.removeLinkFromContext();

await this.closeLink();
this._logger.verbose(`${this.logPrefix} permanently closed this link.`);
Expand All @@ -327,6 +312,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
*/
protected abstract createRheaLink(_options: LinkOptionsT<LinkT>): Promise<LinkT>;

/**
* Clears this link from context's link cache.
*/
protected abstract removeLinkFromContext(): void;

/**
* Closes the internally held rhea link, stops the token renewal timer and sets
* the this._link field to undefined.
Expand Down
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,10 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
throw error;
}
}

protected removeLinkFromContext(): void {
delete this._context.managementClients[this.name];
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,8 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
context.senders[sbSender.name] = sbSender;
return sbSender;
}

protected removeLinkFromContext(): void {
delete this._context.senders[this.name];
}
}
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -658,4 +658,8 @@ export class StreamingReceiver extends MessageReceiver {
this._isDetaching = false;
}
}

protected removeLinkFromContext(): void {
delete this._context.messageReceivers[this.name];
}
}
4 changes: 4 additions & 0 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -925,4 +925,8 @@ export class MessageSession extends LinkEntity<Receiver> {
await messageSession._init(options?.abortSignal);
return messageSession;
}

protected removeLinkFromContext(): void {
delete this._context.messageSessions[this.name];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,25 @@ import chaiAsPromised from "chai-as-promised";
import { Receiver, ReceiverOptions } from "rhea-promise";
import sinon from "sinon";
import { ConnectionContext } from "../../../src/connectionContext";
import { BatchingReceiver } from "../../../src/core/batchingReceiver";
import { LinkEntity } from "../../../src/core/linkEntity";
import { ManagementClient } from "../../../src/core/managementClient";
import { MessageSender } from "../../../src/core/messageSender";
import { StreamingReceiver } from "../../../src/core/streamingReceiver";
import { receiverLogger } from "../../../src/log";
import { MessageSession } from "../../../src/session/messageSession";
import { createConnectionContextForTests, createRheaReceiverForTests } from "./unittestUtils";
chai.use(chaiAsPromised);
const assert = chai.assert;

describe("LinkEntity unit tests", () => {
class LinkForTests extends LinkEntity<Receiver> {
private _removeLinkFromContextCalled: boolean = false;

protected removeLinkFromContext(): void {
this._removeLinkFromContextCalled = true;
}

async createRheaLink(options: ReceiverOptions): Promise<Receiver> {
return createRheaReceiverForTests(options);
}
Expand All @@ -39,6 +50,10 @@ describe("LinkEntity unit tests", () => {

afterEach(async () => {
await linkEntity.close();
assert.isTrue(
(linkEntity as LinkForTests)["_removeLinkFromContextCalled"],
"Every link should have a chance to remove themselves from the cache"
);
});

describe("initLink", () => {
Expand Down Expand Up @@ -327,6 +342,129 @@ describe("LinkEntity unit tests", () => {
});
});

describe("cache cleanup", () => {
it("batchingreceiver", () => {
const batchingReceiver = new BatchingReceiver(connectionContext, "entityPath", {
abortSignal: undefined,
lockRenewer: undefined,
receiveMode: "receiveAndDelete",
tracingOptions: {}
});

initCachedLinks(batchingReceiver.name);

batchingReceiver["removeLinkFromContext"]();

assertLinkCaches({
name: batchingReceiver.name,
clearedCache: connectionContext.messageReceivers,
unchangedCaches: [
connectionContext.managementClients,
connectionContext.messageSessions,
connectionContext.senders
]
});
});

it("streamingreceiver", () => {
const streamingReceiver = new StreamingReceiver(connectionContext, "entityPath", {
abortSignal: undefined,
lockRenewer: undefined,
receiveMode: "receiveAndDelete",
tracingOptions: {}
});

initCachedLinks(streamingReceiver.name);

streamingReceiver["removeLinkFromContext"]();

assertLinkCaches({
name: streamingReceiver.name,
clearedCache: connectionContext.messageReceivers,
unchangedCaches: [
connectionContext.managementClients,
connectionContext.messageSessions,
connectionContext.senders
]
});
});

it("sender", () => {
const sender = new MessageSender(connectionContext, "entityPath", {});

initCachedLinks(sender.name);

sender["removeLinkFromContext"]();

assertLinkCaches({
name: sender.name,
clearedCache: connectionContext.senders,
unchangedCaches: [
connectionContext.managementClients,
connectionContext.messageReceivers,
connectionContext.messageSessions
]
});
});

it("session", () => {
const messageSession = new MessageSession(connectionContext, "entityPath", "session-id", {
abortSignal: undefined,
retryOptions: {}
});

initCachedLinks(messageSession.name);

messageSession["removeLinkFromContext"]();

assertLinkCaches({
name: messageSession.name,
clearedCache: connectionContext.messageSessions,
unchangedCaches: [
connectionContext.managementClients,
connectionContext.messageReceivers,
connectionContext.senders
]
});
});

it("managementclient", () => {
const mgmtClient = new ManagementClient(connectionContext, "entityPath");

initCachedLinks(mgmtClient.name);

mgmtClient["removeLinkFromContext"]();

assertLinkCaches({
name: mgmtClient.name,
clearedCache: connectionContext.managementClients,
unchangedCaches: [
connectionContext.messageSessions,
connectionContext.messageReceivers,
connectionContext.senders
]
});
});

function assertLinkCaches(args: {
name: string;
clearedCache: { [name: string]: any };
unchangedCaches: { [name: string]: any }[];
}): void {
assert.isEmpty(
args.unchangedCaches.filter((cache) => cache[args.name] == null),
"Unrelated caches should not be changed."
);
}

function initCachedLinks(name: string) {
connectionContext.messageReceivers[name] = {} as any;
connectionContext.senders[name] = {} as any;
connectionContext.managementClients[name] = {} as any;
connectionContext.messageSessions[name] = {} as any;
}
});

function assertLinkEntityOpen(): void {
assert.isTrue(linkEntity.isOpen(), "link should be open");
assert.exists(linkEntity["_tokenRenewalTimer"], "the tokenrenewal timer should have been set");
Expand Down

0 comments on commit 58b1993

Please sign in to comment.