Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projec
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1865", "projects\Applications\GH-1865\GH-1865.csproj", "{38CE721E-2801-AED1-DDF8-DC5F888C6C05}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -114,6 +116,10 @@ Global
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.Build.0 = Release|Any CPU
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Debug|Any CPU.Build.0 = Debug|Any CPU
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Release|Any CPU.ActiveCfg = Release|Any CPU
{38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -130,6 +136,7 @@ Global
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{38CE721E-2801-AED1-DDF8-DC5F888C6C05} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Expand Down
15 changes: 15 additions & 0 deletions projects/Applications/GH-1865/GH-1865.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>GH_1865</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../../RabbitMQ.Client/RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
148 changes: 148 additions & 0 deletions projects/Applications/GH-1865/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//---------------------------------------------------------------------------

#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task

using System.Diagnostics;
using System.Globalization;
using RabbitMQ.Client;

class Program
{
static int _channelsProcessed;
static readonly TaskCompletionSource<bool> s_tcs = new();
static readonly ThreadLocal<Random> s_rng = new(() => new Random());

private static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);

static async Task Main(string[] args)
{
const int Repeats = 3;
const int ChannelsToOpen = 20;

var connectionFactory = new ConnectionFactory
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/",
RequestedConnectionTimeout = TimeSpan.FromMilliseconds(60000),
RequestedHeartbeat = TimeSpan.FromSeconds(600),
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false,
ContinuationTimeout = TimeSpan.FromMilliseconds(1000)
};
await using IConnection connection = await connectionFactory.CreateConnectionAsync();

var watch = Stopwatch.StartNew();
_ = Task.Run(async () =>
{
for (int i = 0; i < Repeats; i++)
{
try
{
var tasks = new Task[ChannelsToOpen];
for (int j = 0; j < ChannelsToOpen; j++)
{
tasks[j] = Task.Run(async () =>
{
try
{
IChannel channel = await connection.CreateChannelAsync(
new CreateChannelOptions(true, true));
var cts = new CancellationTokenSource();
int cancelAfterMs = s_rng.Value!.Next(1, 10000); // upper bound exclusive
cts.CancelAfter(cancelAfterMs);
var tcs = new TaskCompletionSource<int>();
channel.ChannelShutdownAsync += async (sender, args) =>
{
await Task.Delay(100);
tcs.TrySetResult(1);
};
try
{
await channel.CloseAsync();
}
catch (TaskCanceledException ex)
{
Console.WriteLine(
$"{Now} CloseAsync canceled after {cancelAfterMs} ms " +
$"{ex.Message}");
}
catch (OperationCanceledException ex)
{
Console.WriteLine(
$"{Now} CloseAsync canceled after {cancelAfterMs} ms" +
$"{ex.Message}");
}
catch (Exception exClose)
{
Console.WriteLine($"{Now} CloseAsync error: {exClose.GetType().Name} {exClose.Message}");
}

// Wait a bit for the ChannelShutdown event to fire
var delayTask = Task.Delay(15000);
await Task.WhenAny(tcs.Task, delayTask);
await channel.DisposeAsync();
cts.Dispose();
}
catch (Exception exOuter)
{
Console.WriteLine($"{Now} outer error: {exOuter.GetType().Name} {exOuter.Message}");
}
finally
{
Interlocked.Increment(ref _channelsProcessed);
}
});
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
Console.WriteLine($"{Now} connection error: {ex.GetType().Name} {ex.Message}");
}
}

s_tcs.SetResult(true);
});

Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
Console.WriteLine();
Console.WriteLine("Opened");
while (false == s_tcs.Task.IsCompleted)
{
await Task.Delay(500);
Console.WriteLine($"{_channelsProcessed,5}");
}
watch.Stop();
Console.WriteLine($"{_channelsProcessed,5}");
Console.WriteLine();
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
}
}
38 changes: 28 additions & 10 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,22 +268,18 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason)
{
if (_disposed)
{
return;
}

if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(reason.CancellationToken)
.ConfigureAwait(false);
try
{
if (!_confirmsTaskCompletionSources.IsEmpty)
{
var exception = new AlreadyClosedException(reason);
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
{
confirmsTaskCompletionSource.TrySetException(exception);
}

_confirmsTaskCompletionSources.Clear();
}
MaybeSetExceptionOnConfirmsTcs(reason);
}
finally
{
Expand Down Expand Up @@ -404,5 +400,27 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void MaybeSetExceptionOnConfirmsTcs(ShutdownEventArgs? reason = null)
{
if (!_confirmsTaskCompletionSources.IsEmpty)
{
Exception ex;
if (reason is not null)
{
ex = new AlreadyClosedException(reason);
}
else
{
ex = new OperationInterruptedException();
}
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
{
confirmsTaskCompletionSource.TrySetException(ex);
}
_confirmsTaskCompletionSources.Clear();
}
}
}
}
17 changes: 9 additions & 8 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ protected virtual void Dispose(bool disposing)
{
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
MaybeSetExceptionOnConfirmsTcs();
}
catch
{
Expand All @@ -608,13 +609,13 @@ public async ValueTask DisposeAsync()
return;
}

await DisposeAsyncCore()
await DisposeAsyncCoreAsync()
.ConfigureAwait(false);

Dispose(false);
}

protected virtual async ValueTask DisposeAsyncCore()
protected virtual async ValueTask DisposeAsyncCoreAsync()
{
if (_disposed)
{
Expand Down Expand Up @@ -669,7 +670,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
return ModelSendAsync(in method, cancellationToken).AsTask();
}

protected async Task<bool> HandleBasicAck(IncomingCommand cmd,
protected async Task<bool> HandleBasicAckAsync(IncomingCommand cmd,
CancellationToken cancellationToken = default)
{
var ack = new BasicAck(cmd.MethodSpan);
Expand All @@ -685,7 +686,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args)
return true;
}

protected async Task<bool> HandleBasicNack(IncomingCommand cmd,
protected async Task<bool> HandleBasicNackAsync(IncomingCommand cmd,
CancellationToken cancellationToken = default)
{
var nack = new BasicNack(cmd.MethodSpan);
Expand All @@ -702,7 +703,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
return true;
}

protected async Task<bool> HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken)
protected async Task<bool> HandleBasicReturnAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
var basicReturn = new BasicReturn(cmd.MethodSpan);

Expand Down Expand Up @@ -1750,16 +1751,16 @@ private Task<bool> DispatchCommandAsync(IncomingCommand cmd, CancellationToken c
}
case ProtocolCommandId.BasicAck:
{
return HandleBasicAck(cmd, cancellationToken);
return HandleBasicAckAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicNack:
{
return HandleBasicNack(cmd, cancellationToken);
return HandleBasicNackAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicReturn:
{
// Note: always returns true
return HandleBasicReturn(cmd, cancellationToken);
return HandleBasicReturnAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelClose:
{
Expand Down