Skip to content

Commit

Permalink
[Service Bus] Do not create a new one if the message session doesn't …
Browse files Browse the repository at this point in the history
…exist before performing the session related operations (Azure#8849)

* throw if messagesession doesn't exist

* Update error to SessionLockLost

* Update sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* Convention for amqpErrors

* _throwIfSessionLockExpired and some refactoring

* _throwIfMessageSessionDoesntExist

* Update sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* refactoring + error message change

* Add checks for _throwIfMessageSessionDoesntExist

* unneeded check

* remove unneeded error variable

* unneeded awaits

* Get the if-check working that is present before `await MessageSession.create`

* Update sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* remove export

* remove _throwIfMessageSessionDoesntExist

* Revert "remove _throwIfMessageSessionDoesntExist"

This reverts commit 281b82c.

* remove unneeded if check

* Move checks from init to sessionReceiver

* async iife

* Update sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* remove     this._throwIfMessageSessionDoesntExist();
for managementLink operations

* move back the followups to init with the new if-check

* ! for the this.sessionId

* Fix getReceiverClosedErrorMsg

* Update sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* Update sdk/servicebus/service-bus/src/session/messageSession.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* async iife -> catch

* getSender  ->  createSender

* Simplify isClosed

* fix typos

* Merge _throwIfMessageSessionDoesntExist and _throwIfReceiverOrConnectionClosed

* Update sdk/servicebus/service-bus/src/util/errors.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* Add !this._messageSession.isOpen() too to the isClosed

* fix test

* Fix retries test "Unpartitioned Queue with Sessions: receiveBatch"

* fix "An already locked session throws SessionCannotBeLockedError" test

* fix tests under "SessionReceiver with invalid sessionId"

* fix mock function for non-sessions

* Moving if checks back to init

* removing if checks from sessionReceiver in favour of init

* formatting

* Update sdk/servicebus/service-bus/src/session/messageSession.ts

* Update sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>

* Update sdk/servicebus/service-bus/src/session/messageSession.ts

* refactor _throwIfReceiverOrConnectionClosed

* remove redundant else block as per Azure#8849 (comment)

* typos

* Split sessionId into providedSessionId and sessionId

* Make providedSessionId a private var and organize imports

* unneeded if check since it is covered in _throwIfReceiverOrConnectionClosed

Co-authored-by: Ramya Rao <ramya.rao.a@outlook.com>
  • Loading branch information
HarshaNalluru and ramya-rao-a authored Jul 15, 2020
1 parent 033b09b commit 735eca2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 136 deletions.
5 changes: 1 addition & 4 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,7 @@ export class ReceiverImpl<ReceivedMessageT extends ReceivedMessage | ReceivedMes
private _throwIfReceiverOrConnectionClosed(): void {
throwErrorIfConnectionClosed(this._context.namespace);
if (this.isClosed) {
const errorMessage = getReceiverClosedErrorMsg(
this._context.entityPath,
this._context.isClosed
);
const errorMessage = getReceiverClosedErrorMsg(this._context.entityPath);
const error = new Error(errorMessage);
log.error(`[${this._context.namespace.connectionId}] %O`, error);
throw error;
Expand Down
132 changes: 47 additions & 85 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ import { convertToInternalReceiveMode } from "../constructorHelpers";
import { Receiver } from "./receiver";
import Long from "long";
import { ReceivedMessageWithLock, ServiceBusMessageImpl } from "../serviceBusMessage";
import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp";
import {
Constants,
RetryConfig,
RetryOperationType,
RetryOptions,
retry,
ErrorNameConditionMapper,
translate
} from "@azure/core-amqp";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import "@azure/core-asynciterator-polyfill";
import { AmqpError } from "rhea-promise";

/**
*A receiver that handles sessions, including renewing the session lock.
Expand All @@ -46,7 +55,7 @@ export interface SessionReceiver<

/**
* @property The time in UTC until which the session is locked.
* Everytime `renewSessionLock()` is called, this time gets updated to current time plus the lock
* Every time `renewSessionLock()` is called, this time gets updated to current time plus the lock
* duration as specified during the Queue/Subscription creation.
*
* Will return undefined until a AMQP receiver link has been successfully set up for the session.
Expand Down Expand Up @@ -93,13 +102,6 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
public entityPath: string;
public sessionId: string;

/**
* @property {ClientEntityContext} _context Describes the amqp connection context for the QueueClient.
*/

private _context: ClientEntityContext;
private _retryOptions: RetryOptions;
private _messageSession: MessageSession | undefined;
/**
* @property {boolean} [_isClosed] Denotes if close() was called on this receiver
*/
Expand All @@ -111,24 +113,14 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
* @throws Error if an open receiver is already existing for given sessionId.
*/
private constructor(
context: ClientEntityContext,
private _messageSession: MessageSession,
private _context: ClientEntityContext,
public receiveMode: "peekLock" | "receiveAndDelete",
private _sessionOptions: CreateSessionReceiverOptions,
retryOptions: RetryOptions = {}
private _retryOptions: RetryOptions = {}
) {
throwErrorIfConnectionClosed(context.namespace);
this._context = context;
throwErrorIfConnectionClosed(this._context.namespace);
this.entityPath = this._context.entityPath;
this._retryOptions = retryOptions;

if (this._sessionOptions.sessionId) {
this._sessionOptions.sessionId = String(this._sessionOptions.sessionId);
}

// `createInitializedSessionReceiver` will set this value by calling init()
// so we just temporarily set it to "" so we can get away with it never being
// `undefined`.
this.sessionId = "";
this.sessionId = _messageSession.sessionId;
}

static async createInitializedSessionReceiver<
Expand All @@ -139,51 +131,39 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
sessionOptions: CreateSessionReceiverOptions,
retryOptions: RetryOptions = {}
): Promise<SessionReceiver<ReceivedMessageT>> {
context.isSessionEnabled = true;
if (sessionOptions.sessionId != undefined) {
sessionOptions.sessionId = String(sessionOptions.sessionId);
}
const messageSession = await MessageSession.create(context, {
sessionId: sessionOptions.sessionId,
autoRenewLockDurationInMs: sessionOptions.autoRenewLockDurationInMs,
receiveMode: convertToInternalReceiveMode(receiveMode)
});
const sessionReceiver = new SessionReceiverImpl<ReceivedMessageT>(
messageSession,
context,
receiveMode,
sessionOptions,
retryOptions
);

await sessionReceiver._createMessageSessionIfDoesntExist();
return sessionReceiver;
}

private _throwIfReceiverOrConnectionClosed(): void {
throwErrorIfConnectionClosed(this._context.namespace);
if (this.isClosed) {
const errorMessage = getReceiverClosedErrorMsg(
this._context.entityPath,
this._context.isClosed,
this.sessionId!
);
const error = new Error(errorMessage);
log.error(`[${this._context.namespace.connectionId}] %O`, error);
throw error;
}
}

private async _createMessageSessionIfDoesntExist(): Promise<void> {
// TODO - pass timeout for MessageSession creation
if (this._messageSession) {
return;
}
this._context.isSessionEnabled = true;
this._messageSession = await MessageSession.create(this._context, {
sessionId: this._sessionOptions.sessionId,
autoRenewLockDurationInMs: this._sessionOptions.autoRenewLockDurationInMs,
receiveMode: convertToInternalReceiveMode(this.receiveMode)
});
// By this point, we should have a valid sessionId on the messageSession
// If not, the receiver cannot be used, so throw error.
if (this._messageSession.sessionId == null) {
const error = new Error("Something went wrong. Cannot lock a session.");
log.error(`[${this._context.namespace.connectionId}] %O`, error);
throw error;
if (this._isClosed) {
const errorMessage = getReceiverClosedErrorMsg(this._context.entityPath, this.sessionId);
const error = new Error(errorMessage);
log.error(`[${this._context.namespace.connectionId}] %O`, error);
throw error;
}
const amqpError: AmqpError = {
condition: ErrorNameConditionMapper.SessionLockLostError,
description: `The session lock has expired on the session with id ${this.sessionId}`
};
throw translate(amqpError);
}
this.sessionId = this._messageSession.sessionId;
return;
}

private _throwIfAlreadyReceiving(): void {
Expand All @@ -197,13 +177,15 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece

public get isClosed(): boolean {
return (
this._isClosed || (this.sessionId ? !this._context.messageSessions[this.sessionId] : false)
this._isClosed ||
!this._context.messageSessions[this.sessionId] ||
!this._messageSession.isOpen()
);
}

/**
* @property The time in UTC until which the session is locked.
* Everytime `renewSessionLock()` is called, this time gets updated to current time plus the lock
* Every time `renewSessionLock()` is called, this time gets updated to current time plus the lock
* duration as specified during the Queue/Subscription creation.
*
* When the lock on the session expires
Expand Down Expand Up @@ -235,9 +217,7 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
*/
async renewSessionLock(options?: OperationOptionsBase): Promise<Date> {
this._throwIfReceiverOrConnectionClosed();

const renewSessionLockOperationPromise = async () => {
await this._createMessageSessionIfDoesntExist();
this._messageSession!.sessionLockedUntilUtc = await this._context.managementClient!.renewSessionLock(
this.sessionId,
{
Expand Down Expand Up @@ -270,7 +250,6 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
this._throwIfReceiverOrConnectionClosed();

const setSessionStateOperationPromise = async () => {
await this._createMessageSessionIfDoesntExist();
await this._context.managementClient!.setSessionState(this.sessionId!, state, {
...options,
requestName: "setState",
Expand Down Expand Up @@ -300,7 +279,6 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
this._throwIfReceiverOrConnectionClosed();

const getSessionStateOperationPromise = async () => {
await this._createMessageSessionIfDoesntExist();
return this._context.managementClient!.getSessionState(this.sessionId, {
...options,
requestName: "getState",
Expand Down Expand Up @@ -379,7 +357,6 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
? sequenceNumbers
: [sequenceNumbers];
const receiveDeferredMessagesOperationPromise = async () => {
await this._createMessageSessionIfDoesntExist();
const deferredMessages = await this._context.managementClient!.receiveDeferredMessages(
deferredSequenceNumbers,
convertToInternalReceiveMode(this.receiveMode),
Expand Down Expand Up @@ -414,8 +391,6 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
}

const receiveBatchOperationPromise = async () => {
await this._createMessageSessionIfDoesntExist();

const receivedMessages = await this._messageSession!.receiveMessages(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs
Expand Down Expand Up @@ -498,21 +473,11 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece
throw new TypeError("The parameter 'onError' must be of type 'function'.");
}

this._createMessageSessionIfDoesntExist()
.then(async () => {
if (!this._messageSession) {
return;
}
if (!this._isClosed) {
this._messageSession.receive(onMessage, onError, options);
} else {
await this._messageSession.close();
}
return;
})
.catch((err) => {
onError(err);
});
try {
this._messageSession.receive(onMessage, onError, options);
} catch (err) {
onError(err);
}
}

getMessageIterator(options?: GetMessageIteratorOptions): AsyncIterableIterator<ReceivedMessageT> {
Expand All @@ -521,10 +486,7 @@ export class SessionReceiverImpl<ReceivedMessageT extends ReceivedMessage | Rece

async close(): Promise<void> {
try {
if (this._messageSession) {
await this._messageSession.close();
this._messageSession = undefined;
}
await this._messageSession.close();
} catch (err) {
log.error(
"[%s] An error occurred while closing the SessionReceiver for session %s in %s: %O",
Expand Down
Loading

0 comments on commit 735eca2

Please sign in to comment.