Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static void ExchangeDeclare(this IChannel channel, string exchange, strin
public static ValueTask ExchangeDeclareAsync(this IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object> arguments = null)
{
return channel.ExchangeDeclareAsync(exchange, type, durable, autoDelete, arguments);
return channel.ExchangeDeclareAsync(exchange, type, false, durable, autoDelete, arguments);
}

/// <summary>
Expand Down
38 changes: 23 additions & 15 deletions projects/RabbitMQ.Client/client/api/ICredentialsRefresher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics.Tracing;
namespace RabbitMQ.Client
{
Expand All @@ -57,11 +57,13 @@ public class TimerBasedCredentialRefresherEventSource : EventSource
public void TriggeredTimer(string name) => WriteEvent(4, "TriggeredTimer", name);
[Event(5)]
public void RefreshedCredentials(string name, bool succesfully) => WriteEvent(5, "RefreshedCredentials", name, succesfully);
[Event(6)]
public void AlreadyRegistered(string name) => WriteEvent(6, "AlreadyRegistered", name);
}

public class TimerBasedCredentialRefresher : ICredentialsRefresher
{
private Dictionary<ICredentialsProvider, System.Timers.Timer> _registrations = new Dictionary<ICredentialsProvider, System.Timers.Timer>();
private readonly ConcurrentDictionary<ICredentialsProvider, System.Timers.Timer> _registrations = new();

public ICredentialsProvider Register(ICredentialsProvider provider, ICredentialsRefresher.NotifyCredentialRefreshed callback)
{
Expand All @@ -70,25 +72,31 @@ public ICredentialsProvider Register(ICredentialsProvider provider, ICredentials
return provider;
}

_registrations.Add(provider, scheduleTimer(provider, callback));
TimerBasedCredentialRefresherEventSource.Log.Registered(provider.Name);
if (_registrations.TryAdd(provider, scheduleTimer(provider, callback)))
{
TimerBasedCredentialRefresherEventSource.Log.Registered(provider.Name);
}
else
{
TimerBasedCredentialRefresherEventSource.Log.AlreadyRegistered(provider.Name);
}

return provider;
}

public bool Unregister(ICredentialsProvider provider)
{
if (!_registrations.ContainsKey(provider))
if (_registrations.TryRemove(provider, out System.Timers.Timer timer))
{
return false;
}

var timer = _registrations[provider];
if (timer != null)
{
TimerBasedCredentialRefresherEventSource.Log.Unregistered(provider.Name);
timer.Stop();
_registrations.Remove(provider);
timer.Dispose();
try
{
TimerBasedCredentialRefresherEventSource.Log.Unregistered(provider.Name);
timer.Stop();
}
finally
{
timer.Dispose();
}
return true;
}
else
Expand Down
76 changes: 46 additions & 30 deletions projects/Test/OAuth2/TestOAuth2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ public OAuth2Options(Mode mode)
public int TokenExpiresInSeconds => 60;
}

public class TestOAuth2
public class TestOAuth2 : IAsyncLifetime
{
private const string Exchange = "test_direct";

private readonly AutoResetEvent _doneEvent = new AutoResetEvent(false);
private readonly ITestOutputHelper _testOutputHelper;
private readonly IConnection _connection;
private readonly IConnectionFactory _connectionFactory;
private IConnection _connection;
private readonly int _tokenExpiresInSeconds;

public TestOAuth2(ITestOutputHelper testOutputHelper)
Expand All @@ -75,61 +76,76 @@ public TestOAuth2(ITestOutputHelper testOutputHelper)
Mode mode = (Mode)Enum.Parse(typeof(Mode), modeStr.ToLowerInvariant());
var options = new OAuth2Options(mode);

var connectionFactory = new ConnectionFactory
_connectionFactory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
DispatchConsumersAsync = true,
CredentialsProvider = GetCredentialsProvider(options),
CredentialsRefresher = GetCredentialsRefresher(),
ClientProvidedName = nameof(TestOAuth2)
};

_connection = connectionFactory.CreateConnection();
_tokenExpiresInSeconds = options.TokenExpiresInSeconds;
}

public async Task InitializeAsync()
{
_connection = await _connectionFactory.CreateConnectionAsync();
}

public async Task DisposeAsync()
{
await _connection.CloseAsync();
_connection.Dispose();
}

[Fact]
public async void IntegrationTest()
{
using (_connection)
using (IChannel publishChannel = await DeclarePublisherAsync())
using (IChannel consumeChannel = await DeclareConsumerAsync())
{
using (IChannel publisher = declarePublisher())
using (IChannel subscriber = await declareConsumer())
{
await Publish(publisher);
Consume(subscriber);
await PublishAsync(publishChannel);
Consume(consumeChannel);

if (_tokenExpiresInSeconds > 0)
if (_tokenExpiresInSeconds > 0)
{
for (int i = 0; i < 4; i++)
{
for (int i = 0; i < 4; i++)
{
_testOutputHelper.WriteLine("Wait until Token expires. Attempt #" + (i + 1));
_testOutputHelper.WriteLine("Wait until Token expires. Attempt #" + (i + 1));

await Task.Delay(TimeSpan.FromSeconds(_tokenExpiresInSeconds + 10));
_testOutputHelper.WriteLine("Resuming ..");
await Task.Delay(TimeSpan.FromSeconds(_tokenExpiresInSeconds + 10));
_testOutputHelper.WriteLine("Resuming ..");

await Publish(publisher);
_doneEvent.Reset();
await PublishAsync(publishChannel);
_doneEvent.Reset();

Consume(subscriber);
}
}
else
{
throw new InvalidOperationException();
Consume(consumeChannel);
}
}
else
{
Assert.Fail("_tokenExpiresInSeconds is NOT greater than 0");
}
}
}

private IChannel declarePublisher()
[Fact]
public async void SecondConnectionCrashes_GH1429()
{
// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1429
using IConnection secondConnection = await _connectionFactory.CreateConnectionAsync();
}

private async Task<IChannel> DeclarePublisherAsync()
{
IChannel publisher = _connection.CreateChannel();
publisher.ConfirmSelect();
publisher.ExchangeDeclare("test_direct", ExchangeType.Direct, true, false);
IChannel publisher = await _connection.CreateChannelAsync();
await publisher.ConfirmSelectAsync();
await publisher.ExchangeDeclareAsync("test_direct", ExchangeType.Direct, true, false);
return publisher;
}

private async Task Publish(IChannel publisher)
private async Task PublishAsync(IChannel publisher)
{
const string message = "Hello World!";

Expand All @@ -146,7 +162,7 @@ private async Task Publish(IChannel publisher)
_testOutputHelper.WriteLine("Confirmed Sent message");
}

private async ValueTask<IChannel> declareConsumer()
private async ValueTask<IChannel> DeclareConsumerAsync()
{
IChannel subscriber = _connection.CreateChannel();
await subscriber.QueueDeclareAsync(queue: "testqueue", passive: false, true, false, false, arguments: null);
Expand Down
2 changes: 2 additions & 0 deletions projects/Test/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,8 @@ namespace RabbitMQ.Client
{
public TimerBasedCredentialRefresherEventSource() { }
public static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource Log { get; }
[System.Diagnostics.Tracing.Event(6)]
public void AlreadyRegistered(string name) { }
[System.Diagnostics.Tracing.Event(5)]
public void RefreshedCredentials(string name, bool succesfully) { }
[System.Diagnostics.Tracing.Event(1)]
Expand Down