Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable a way to Unregister Message Handler and Session Handler #14021

Merged
merged 23 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8c46a41
add UnregisterMessageHandler method
DorothySun216 Aug 6, 2020
db9b322
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverCl…
DorothySun216 Aug 10, 2020
da77a24
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageRece…
DorothySun216 Aug 10, 2020
5e1576a
Update the unregister method to be async and await for inflight opera…
DorothySun216 Aug 13, 2020
61f0998
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClie…
DorothySun216 Aug 13, 2020
2988d02
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageRece…
DorothySun216 Aug 13, 2020
d3faba2
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
DorothySun216 Aug 13, 2020
4b921a6
Change name to have async suffix and add to existing onMessageQueueTests
DorothySun216 Aug 13, 2020
78aff54
Add UnregisterSessionHandlerAsync and corresponding tests
DorothySun216 Aug 13, 2020
25f6b8d
nit
DorothySun216 Aug 13, 2020
d4a5589
nit
DorothySun216 Aug 13, 2020
ecd820b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-net i…
DorothySun216 Aug 20, 2020
cd8f044
Add a new cancellation type to not cancel inflight message handling o…
DorothySun216 Aug 23, 2020
fff0016
Add another type of cancellation token to session handler path
DorothySun216 Aug 24, 2020
fae7bf1
nit
DorothySun216 Aug 24, 2020
642dc36
Add a timeout parameter to unregister functions and add according uni…
DorothySun216 Sep 4, 2020
ffc4249
nit
DorothySun216 Sep 4, 2020
fbd31d0
cancel runningTaskCancellationTokenSource after unregister is done
DorothySun216 Sep 4, 2020
f4dac4b
change public API
DorothySun216 Sep 9, 2020
9692122
update the API header
DorothySun216 Sep 9, 2020
7741ad9
update the API definition
DorothySun216 Sep 10, 2020
2bb96a5
fix spacing
DorothySun216 Sep 10, 2020
c0ec74a
fix ApproveAzureServiceBus CIT test
DorothySun216 Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update the unregister method to be async and await for inflight opera…
…tions to finish
  • Loading branch information
DorothySun216 committed Aug 13, 2020
commit 5e1576abcd1237d5c9daaff900e4e1148a51e544
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface IReceiverClient : IClientEntity
/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered.
/// </summary>
void UnregisterMessageHandler();
Task UnregisterMessageHandler();
DorothySun216 marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,23 +902,31 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
/// <summary>
/// Unregister message hander from the receiver if there is an active message handler registered.
/// </summary>
public void UnregisterMessageHandler()
public async Task UnregisterMessageHandler()
{
this.ThrowIfClosed();

MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId);
lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump != null)
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.receivePump = null;
}
else
if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested)
{
throw new InvalidOperationException(Resources.MessageHandlerNotRegisteredYet);
return;
}

this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
}

while (this.receivePump != null
&& this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls)
{
await Task.Delay(10);
DorothySun216 marked this conversation as resolved.
Show resolved Hide resolved
}

lock (this.messageReceivePumpSyncLock)
{
this.receivePump = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest cancelling and disposing the runningTaskCancellationTokenSource here.

}
MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ namespace Microsoft.Azure.ServiceBus

sealed class MessageReceivePump
{
public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
public readonly MessageHandlerOptions registerHandlerOptions;
readonly Func<Message, CancellationToken, Task> onMessageCallback;
readonly string endpoint;
readonly MessageHandlerOptions registerHandlerOptions;
readonly IMessageReceiver messageReceiver;
readonly CancellationToken pumpCancellationToken;
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
readonly ServiceBusDiagnosticSource diagnosticSource;


public MessageReceivePump(IMessageReceiver messageReceiver,
MessageHandlerOptions registerHandlerOptions,
Func<Message, CancellationToken, Task> callback,
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,10 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
/// <summary>
/// Unregister messgae hander from the receiver if there is active message handler registered.
/// </summary>
public void UnregisterMessageHandler()
public async Task UnregisterMessageHandler()
{
this.ThrowIfClosed();
this.InnerReceiver.UnregisterMessageHandler();
await this.InnerReceiver.UnregisterMessageHandler();
DorothySun216 marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,10 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
/// <summary>
/// Unregister messgae hander from the receiver if there is active message handler registered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Unregister messgae hander from the receiver if there is active message handler registered.
/// Unregister message hander from the receiver if there is an active message handler registered.

/// </summary>
public void UnregisterMessageHandler()
public async Task UnregisterMessageHandler()
{
this.ThrowIfClosed();
this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler();
await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler();
DorothySun216 marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down