Skip to content

Commit

Permalink
fix: bug on cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
MHKarami97 committed Dec 2, 2024
1 parent a7c0cc3 commit ac4894b
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CacheManagerClear.Rabbit/CachePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ public CachePublisher(IConnection connection, string exchange)
/// </summary>
public async Task StopAsync()
{
if (_disposed)
{
return;
}

await _connection.DisposeAsync().ConfigureAwait(false);
await _connection.CloseAsync().ConfigureAwait(false);
_disposed = true;
}

/// <summary>
Expand Down
76 changes: 54 additions & 22 deletions CacheManagerClear.Rabbit/CacheSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class CacheSubscriber : ICacheSubscriber
/// <param name="cacheManager">Cache manager instance</param>
/// <param name="exchange">RabbitMQ exchange name</param>
/// <param name="queue">RabbitMQ queue name</param>
public CacheSubscriber(IConnection connection, IEasyCacheManager cacheManager, string exchange, string queue)
public CacheSubscriber(IConnection connection, string exchange, string queue, IEasyCacheManager cacheManager)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_cacheManager = cacheManager ?? throw new ArgumentNullException(nameof(cacheManager));
Expand All @@ -36,41 +36,45 @@ public CacheSubscriber(IConnection connection, IEasyCacheManager cacheManager, s
/// </summary>
public async Task SubscribeAsync(CancellationToken cancellationToken)
{
using var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

await channel.ExchangeDeclareAsync(exchange: _exchange, type: ExchangeType.Fanout, durable: true, cancellationToken: cancellationToken).ConfigureAwait(false);
_ = await channel.QueueDeclareAsync(queue: _queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken).ConfigureAwait(false);
await channel.QueueBindAsync(queue: _queue, exchange: _exchange, routingKey: string.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);
try
{
using var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);

var consumer = new AsyncEventingBasicConsumer(channel);
await channel.ExchangeDeclareAsync(exchange: _exchange, type: ExchangeType.Fanout, durable: true, cancellationToken: cancellationToken).ConfigureAwait(false);
_ = await channel.QueueDeclareAsync(queue: _queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken).ConfigureAwait(false);
await channel.QueueBindAsync(queue: _queue, exchange: _exchange, routingKey: string.Empty, cancellationToken: cancellationToken).ConfigureAwait(false);

consumer.ReceivedAsync += async (_, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var consumer = new AsyncEventingBasicConsumer(channel);

if (message == "*")
{
await _cacheManager.ClearAllCacheAsync().ConfigureAwait(false);
}
else
{
await _cacheManager.ClearCacheAsync(message).ConfigureAwait(false);
}
consumer.ReceivedAsync += async (_, ea) => await ProcessAsync(ea, channel, cancellationToken).ConfigureAwait(false);

await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken).ConfigureAwait(false);
};
// Start consuming messages
_ = await channel.BasicConsumeAsync(queue: _queue, autoAck: false, consumer: consumer, cancellationToken: cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
}
catch (Exception e)
{
Console.WriteLine(e);

// Start consuming messages
_ = await channel.BasicConsumeAsync(queue: _queue, autoAck: false, consumer: consumer, cancellationToken: cancellationToken).ConfigureAwait(false);
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Stops the Kafka subscription process.
/// </summary>
public async Task StopAsync()
{
if (_disposed)
{
return;
}

await _connection.DisposeAsync().ConfigureAwait(false);
await _connection.CloseAsync().ConfigureAwait(false);
_disposed = true;
}

/// <summary>
Expand All @@ -88,4 +92,32 @@ public async ValueTask DisposeAsync()

_disposed = true;
}

private async Task ProcessAsync(BasicDeliverEventArgs ea, IChannel channel, CancellationToken cancellationToken)
{
try
{
var key = Encoding.UTF8.GetString(ea.Body.ToArray());

if (key.Equals(StaticData.ClearAllKey, StringComparison.Ordinal))
{
await _cacheManager.ClearAllCacheAsync().ConfigureAwait(false);
}
else
{
await _cacheManager.ClearCacheAsync(key).ConfigureAwait(false);
}

await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
}
catch (Exception e)
{
Console.WriteLine(e);

await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
}
}
4 changes: 2 additions & 2 deletions CacheManagerClear.Rabbit/RabbitCacheClearBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static CacheManagerClearBuilder UseRabbitSubscriber(this CacheManagerClea
}
#endif

var subscriber = new CacheSubscriber(connection, cacheManager, exchange, queue);
var subscriber = new CacheSubscriber(connection, exchange, queue, cacheManager);

_ = builder.BuildSubscriber(subscriber);

Expand Down Expand Up @@ -141,7 +141,7 @@ public static CacheManagerClearBuilder UseRabbitPublisherAndSubscriber(this Cach
#endif

var publisher = new CachePublisher(connection, exchange);
var subscriber = new CacheSubscriber(connection, cacheManager, exchange, queue);
var subscriber = new CacheSubscriber(connection, exchange, queue, cacheManager);

_ = builder.BuildPublisherAndSubscriber(publisher, subscriber);

Expand Down
6 changes: 6 additions & 0 deletions CacheManagerClear.Redis/CachePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ public async Task PublishClearAllCacheAsync(CancellationToken? cancellationToken
/// </summary>
public async Task StopAsync()
{
if (_disposed)
{
return;
}

await _redis.DisposeAsync().ConfigureAwait(false);
await _redis.CloseAsync().ConfigureAwait(false);
_disposed = true;
}

/// <summary>
Expand Down
6 changes: 6 additions & 0 deletions CacheManagerClear.Redis/CacheSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ await subscriber.SubscribeAsync(channel, async (_, message) =>
/// </summary>
public async Task StopAsync()
{
if (_disposed)
{
return;
}

await _redis.DisposeAsync().ConfigureAwait(false);
await _redis.CloseAsync().ConfigureAwait(false);
_disposed = true;
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions CacheManagerUnitTest/StaticData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ internal static class StaticData
internal const string Value = "test_value";
public const string ClearAllKey = "*";
public const string Topic = "cache_clear_topic";
public const string Exchange = "cache_clear_exchange";
public const string Queue = "cache_clear_queue";
}

0 comments on commit ac4894b

Please sign in to comment.