Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Fix message loss issues with peekLock and receiveAndDelete #15989

Merged
merged 25 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8ca0a16
Fixing an issue where we could lose messages or provoke an alarming m…
richardpark-msft Jun 25, 2021
7562013
Remove comments
richardpark-msft Jun 25, 2021
4338119
Implementing the minimal amount needed to allow for the drain/credit …
richardpark-msft Jun 25, 2021
127ac43
Formatting
richardpark-msft Jun 25, 2021
fc0c51a
Updating changelog
richardpark-msft Jun 25, 2021
86a5613
Use logger.warning
richardpark-msft Jun 25, 2021
8d48098
Fixing a silly unit test bug (I was decremeting the credit count when…
richardpark-msft Jun 26, 2021
57c6dd1
Cleaning up the tests. The 'fake' receiver had a bunch of duplicate c…
richardpark-msft Jun 26, 2021
a2c2595
Updating core-amqp changelog and package.json for new rhea version th…
richardpark-msft Jun 29, 2021
085aeec
Updating core-amqp dependency to 3.1.0 to have access to the new vers…
richardpark-msft Jun 29, 2021
a13ee56
Update mockhubs and eventhubs to use the newest core-amqp and rhea.
richardpark-msft Jun 29, 2021
1ea8206
Update package version to what it is now.
richardpark-msft Jun 29, 2021
bd46bfc
Package version update
richardpark-msft Jun 29, 2021
218ab80
Update package version
richardpark-msft Jun 29, 2021
c0eefee
package version and release date
richardpark-msft Jun 29, 2021
81e9d27
- Updating code to use the properly exposed rheaPromise.drainCredit =…
richardpark-msft Jun 30, 2021
22d3a8a
clean checkout from main, then rush update
richardpark-msft Jun 30, 2021
752ac46
Merge remote-tracking branch 'upstream/main' into sb-enoeden-clean-fix
richardpark-msft Jun 30, 2021
06e2d21
Updated rhea-promise and rhea (as minimal as a rush update as we're g…
richardpark-msft Jun 30, 2021
861a7ca
Updated rhea-promise and rhea (as minimal as a rush update as we're g…
richardpark-msft Jun 30, 2021
99d7750
There's nothing in the latest core-amqp that's required for _this_ ch…
richardpark-msft Jun 30, 2021
cac8131
revert pnpm-lock.yaml before merge
richardpark-msft Jun 30, 2021
88489b2
Merge remote-tracking branch 'upstream/main' into sb-enoeden-clean-fix
richardpark-msft Jun 30, 2021
e0a442c
revert to `main`, and rush update again.
richardpark-msft Jun 30, 2021
5859334
Revert the date on the changelog for core-amqp since we're not releas…
richardpark-msft Jun 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
# Release History

## 3.0.1 (Unreleased)

### Features Added

### Breaking Changes
## 3.1.0 (2021-06-30)
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

### Key Bugs Fixed

### Fixed

- Updated to use the latest version of the `rhea` package.
Part of a fix for PR#15989, where draining messages could sometimes lead to message loss with `receiver.receiveMessages()`.
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)

## 3.0.0 (2021-06-09)

Expand Down
6 changes: 3 additions & 3 deletions sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/core-amqp",
"sdk-type": "client",
"version": "3.0.1",
"version": "3.1.0",
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down Expand Up @@ -76,8 +76,8 @@
"events": "^3.0.0",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea": "^2.0.2",
"rhea-promise": "^2.0.0",
"rhea": "^2.0.3",
"rhea-promise": "^2.1.0",
"tslib": "^2.2.0",
"url": "^0.11.0",
"util": "^0.12.1"
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
"is-buffer": "^2.0.3",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea-promise": "^2.0.0",
"rhea-promise": "^2.1.0",
"tslib": "^2.2.0",
"uuid": "^8.3.0"
},
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/mock-hub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"rhea": "^2.0.2",
"rhea": "^2.0.3",
"tslib": "^2.2.0"
},
"//sampleConfiguration": {
Expand Down
10 changes: 5 additions & 5 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Release History

## 7.3.0 (Unreleased)

## 7.3.0 (2021-07-06)
### Features Added
- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.

### Breaking Changes

- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.
### Key Bugs Fixed

- Fixed a bug that could lead to message loss in certain conditions when using `receiver.receiveMessages()`.
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)

### Fixed

- Fixing an issue where the internal link cache would not properly remove closed links.
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
"long": "^4.0.0",
"process": "^0.11.10",
"tslib": "^2.2.0",
"rhea-promise": "^2.0.0"
"rhea-promise": "^2.1.0"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down
49 changes: 33 additions & 16 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
OnAmqpEvent,
ReceiverEvents,
SessionEvents,
Receiver,
Receiver as RheaPromiseReceiver,
Session
} from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
Expand Down Expand Up @@ -191,16 +191,22 @@ export function getRemainingWaitTimeInMsFn(
*
* @internal
*/
type EventEmitterLike<T extends Receiver | Session> = Pick<T, "once" | "removeListener" | "on">;
type EventEmitterLike<T extends RheaPromiseReceiver | Session> = Pick<
T,
"once" | "removeListener" | "on"
>;

/**
* The bare minimum needed to receive messages for batched
* message receiving.
*
* @internal
*/
export type MinimalReceiver = Pick<Receiver, "name" | "isOpen" | "credit" | "addCredit" | "drain"> &
EventEmitterLike<Receiver> & {
export type MinimalReceiver = Pick<
RheaPromiseReceiver,
"name" | "isOpen" | "credit" | "addCredit" | "drain" | "drainCredit"
> &
EventEmitterLike<RheaPromiseReceiver> & {
session: EventEmitterLike<Session>;
} & {
connection: {
Expand Down Expand Up @@ -269,6 +275,7 @@ export class BatchingReceiverLite {

private _getRemainingWaitTimeInMsFn: typeof getRemainingWaitTimeInMsFn;
private _closeHandler: ((connectionError?: AmqpError | Error) => void) | undefined;
private _finalAction: (() => void) | undefined;

isReceivingMessages: boolean;

Expand Down Expand Up @@ -389,16 +396,17 @@ export class BatchingReceiverLite {
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
this._finalAction = (): void => {
if (receiver.drain) {
// If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will
// arrive before the drain completes.
return;
}

// Drain any pending credits.
if (receiver.isOpen() && receiver.credit > 0) {
logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`);

// setting .drain and combining it with .addCredit results in (eventually) sending
// a drain request to Service Bus. When the drain completes rhea will call `onReceiveDrain`
// at which point we'll wrap everything up and resolve the promise.
receiver.drain = true;
receiver.addCredit(1);
receiver.drainCredit();
} else {
logger.verbose(
`${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`
Expand Down Expand Up @@ -429,15 +437,24 @@ export class BatchingReceiverLite {
logger.verbose(
`${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.`
);
finalAction();
this._finalAction!();
}, remainingWaitTimeInMs);
}
}

try {
const data: ServiceBusMessageImpl = this._createServiceBusMessage(context);
if (brokeredMessages.length < args.maxMessageCount) {
brokeredMessages.push(data);
brokeredMessages.push(data);

// NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive
// extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be
// silently dropped on the floor.
if (brokeredMessages.length > args.maxMessageCount) {
logger.warning(
`More messages arrived than were expected: ${
args.maxMessageCount
} vs ${brokeredMessages.length + 1}`
);
}
} catch (err) {
const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
Expand All @@ -448,7 +465,7 @@ export class BatchingReceiverLite {
reject(errObj);
}
if (brokeredMessages.length === args.maxMessageCount) {
finalAction();
this._finalAction!();
}
};

Expand Down Expand Up @@ -515,7 +532,7 @@ export class BatchingReceiverLite {
logger.verbose(
`${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.`
);
finalAction();
this._finalAction!();
}, args.maxWaitTimeInMs);

receiver.on(ReceiverEvents.message, onReceiveMessage);
Expand Down
5 changes: 1 addition & 4 deletions sdk/servicebus/service-bus/src/core/receiverHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ export class ReceiverHelper {
resolve();
});

receiver.drain = true;
// this is not actually adding another credit - it'll just
// cause the drain call to start.
receiver.addCredit(1);
receiver.drainCredit();
});

return drainPromise;
Expand Down
21 changes: 5 additions & 16 deletions sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1388,24 +1388,13 @@ function causeDisconnectDuringDrain(
throw new Error("No active link for batching receiver");
}

const origAddCredit = link.addCredit;

// We want to simulate a disconnect once the batching receiver is draining.
// We can detect when the receiver enters a draining state when `addCredit` is
// called while didRequestDrainResolver is called to resolve the promise.
const addCreditThatImmediatelyDetaches = function(credits: number): void {
origAddCredit.call(link, credits);

if (link.drain && credits === 1) {
// initiate the detach now (prior to any possibilty of the 'drain' call being scheduled)
batchingReceiver
.onDetached(new Error("Test: fake connection failure"))
.then(() => resolveOnDetachedCallPromise());
}
link["drainCredit"] = () => {
// don't send the drain request, we'll just detach.
batchingReceiver
.onDetached(new Error("Test: fake connection failure"))
.then(() => resolveOnDetachedCallPromise());
};

link["addCredit"] = addCreditThatImmediatelyDetaches;

return {
onDetachedCalledPromise
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import {
chai.use(chaiAsPromised);
const assert = chai.assert;

describe("Sample scenarios for track 2", () => {
/**
* A basic suite that exercises most of the core functionality.
*/
describe("Smoke tests", () => {
let serviceBusClient: ServiceBusClientForTests;

before(async () => {
Expand Down
Loading