Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Cache UpdateDispositionWorkItem Mono #22317

Merged
merged 4 commits into from
Jun 22, 2021

Conversation

YijunXieMS
Copy link
Contributor

fixes #22299

ServiceBusReactorReceiver.updateDisposition does not return or throw error if try timeout is long (from about 50 seconds but no exact number, usually 10 seconds is safe) and network is down.
This is because when try timeout is long enough, ServiceBusReactorReceiver.updateDisposition() hasn't timed out while another thread closes the ServiceBusReactorReceiver. In the closeAsync method, the Mono object of work items of type UpdateDispositionWorkItem is subscribed again (using when). The dual subscription causes the first Mono's sink doesn't complete. Adding a .cache() to the Mono will ensure the sink object isn't created for a second time.

@YijunXieMS YijunXieMS self-assigned this Jun 16, 2021
@ghost ghost added the Service Bus label Jun 16, 2021
@YijunXieMS YijunXieMS requested a review from srnagar June 17, 2021 19:43
@@ -250,7 +250,7 @@ protected Message decodeDelivery(Delivery delivery) {
sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
error, handler.getErrorContext(receiver)));
}
});
}).cache().then(Mono.empty()); // cache because closeAsync use `when` to subscribe this Mono again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need .then(Mono.empty())? If so, then use then()?

Copy link
Contributor Author

@YijunXieMS YijunXieMS Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Mono.empty() is not required here.

@@ -362,7 +362,7 @@ private void cleanupWorkItems() {
});
}

private void completeWorkItem(String lockToken, Delivery delivery, MonoSink<Void> sink, Throwable error) {
private void completeWorkItem(String lockToken, Delivery delivery, MonoSink<Object> sink, Throwable error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it to be a MonoSink of Object? I thought you could cache to .<Void>cache()?

Copy link
Contributor Author

@YijunXieMS YijunXieMS Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the .cache() call turns the sink into a MonoSink<Object> from MonoSink<Void>.

final Mono<Void> result = Mono.create(sink -> {
            workItem.start(sink);
            try {
                provider.getReactorDispatcher().invoke(() -> {
                    unsettled.disposition(deliveryState);
                    pendingUpdates.put(lockToken, workItem);
                });
            } catch (IOException error) {
                sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
                    error, handler.getErrorContext(receiver)));
            }
        }).cache().then();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can cast the create operator to Void as shown below.

final Mono<Void> result = Mono.<Void>create(sink -> {
            workItem.start(sink);
            try {
                provider.getReactorDispatcher().invoke(() -> {
                    unsettled.disposition(deliveryState);
                    pendingUpdates.put(lockToken, workItem);
                });
            } catch (IOException error) {
                sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.",
                    error, handler.getErrorContext(receiver)));
            }
        }).cache();

@YijunXieMS YijunXieMS merged commit c0d060f into Azure:main Jun 22, 2021
@YijunXieMS YijunXieMS deleted the sb-cache-workitem branch June 22, 2021 20:37
azure-sdk pushed a commit to azure-sdk/azure-sdk-for-java that referenced this pull request Feb 1, 2023
Machinelearningservices microsoft.machine learning services 2022 12 01 preview (Azure#21761)

* Adds base for updating Microsoft.MachineLearningServices from version preview/2022-10-01-preview to version 2022-12-01-preview

* Updates readme

* Updates API version in new specs and examples

* Add Dec API Registries Swagger (Azure#21419)

* add december registries swagger + examples

* add status code 202 in examples

* fix 202 examples

* fixes

* fixes

* fix

* add 202 back in for put/patch

Co-authored-by: Komal Yadav <komalyadav@microsoft.com>

* remove location (Azure#21430)

Co-authored-by: Komal Yadav <komalyadav@microsoft.com>

* remove readonly flag on schedules property for CI (Azure#21653)

Co-authored-by: Naman Agarwal <naagarw@microsoft.com>

* add missing workspace properties (Azure#21725)

* December preview updating mfe.json specs (Azure#21510)

* December preview updating mfe.json specs

* MFE Dec 2022 Preview API - Adding logbase

* MFE 2022-12-01-preview swagger spec model validation fix

* MFE 2022-12-01-preview swagger spec model validation fix, add missing location

* MFE 2022-12-01-preview swagger spec model validation - typo fix

* MFE 2022-12-01-preview swagger spec model validation - fix api version in automljob example

* MFE 2022-12-01-preview swagger spec model validation - fix for multiselectenabled error

* MFE 2022-12-01-preview swagger spec model validation - fix for multiselectenabled error

* Fix  for 1006 - RemovedDefinition (RecurrenceTrigger,CronTrigger) (Azure#21822)

* fix ReadonlyPropertyChanged of MLC (Azure#21814)

Co-authored-by: Bingchen Li <bingchenli@microsoft.com>

* fixed custom-words conflict (Azure#21829)

* fix custom-words conflict merge (Azure#21830)

* example fix (INVALID_REQUEST_PARAMETER) (Azure#21832)

Co-authored-by: Ivaliy Ivanov <ivaliyivanov@Ivaliys-MacBook-Air.local>

* example fix, use correct api preview version  - (INVALID_REQUEST_PARAMETER) (Azure#21833)

Co-authored-by: Ivaliy Ivanov <ivaliyivanov@Ivaliys-MacBook-Air.local>

* Revert breaking change for MLC swagger 2022-12-01-preview (Azure#21885)

Co-authored-by: Bingchen Li <bingchenli@microsoft.com>

* Revert Connection Category back to enum. (Azure#21939)

* revert provisioning state change (Azure#21940)

* remove body (Azure#21978)

Co-authored-by: Komal Yadav <komalyadav@microsoft.com>

* Addressed comments, added x-ms-long-running-operation to a patch call (Azure#22005)

* Addressed comments, added x-ms-long-running-operation to a patch call

* fix examples for patch - remove body

* fixed formatting

* Ivalbert fix patch2 (Azure#22006)

* Addressed comments, added x-ms-long-running-operation to a patch call

* fix examples for patch - remove body

* fixed formatting

* fixed formatting

* Updated custom words (Azure#22262)

* Fixed prettier errors (Azure#22237)

* fixed examples for LRO_RESPONSE_HEADER check (Azure#22293)

* fixed examples for LRO_RESPONSE_HEADER check (Azure#22294)

* Example fix - OBJECT_MISSING_REQUIRED_PROPERTY - Missing required property: triggerType (Azure#22317)

---------

Co-authored-by: Komal Yadav <23komal.yadav23@gmail.com>
Co-authored-by: Komal Yadav <komalyadav@microsoft.com>
Co-authored-by: Naman Agarwal <namanag16@gmail.com>
Co-authored-by: Naman Agarwal <naagarw@microsoft.com>
Co-authored-by: ZhidaLiu <zhili@microsoft.com>
Co-authored-by: libc16 <88697960+libc16@users.noreply.github.com>
Co-authored-by: Bingchen Li <bingchenli@microsoft.com>
Co-authored-by: Ivaliy Ivanov <ivaliyivanov@Ivaliys-MacBook-Air.local>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[ServiceBus] Settlement API may not return or throw error if try timeout is long
3 participants