Skip to content

Commit

Permalink
added logging
Browse files Browse the repository at this point in the history
  • Loading branch information
imperugo committed Apr 9, 2020
1 parent ff66bcb commit 9c5a2bf
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .ruleset
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<RuleSet Name="Rules for Technogym.MwCloud.Commons" Description="Code analysis rules for Technogym.MwCloud.Commons.csproj." ToolsVersion="15.0">
<RuleSet Name="StackExchange.Redis.Extensions rules" Description="Code analysis rules for StackExchange.Redis.Extensions projects." ToolsVersion="15.0">
<Rules AnalyzerId="AsyncUsageAnalyzers" RuleNamespace="AsyncUsageAnalyzers">
<Rule Id="UseConfigureAwait" Action="Error" />
</Rules>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using StackExchange.Redis.Extensions.Core.Models;

namespace StackExchange.Redis.Extensions.Core.Abstractions
{
Expand All @@ -12,5 +13,10 @@ public interface IRedisCacheConnectionPoolManager : IDisposable
/// </summary>
/// <returns>Returns an instance of<see cref="IConnectionMultiplexer"/>.</returns>
IConnectionMultiplexer GetConnection();

/// <summary>
/// Gets the information about the connection pool
/// </summary>
ConnectionPoolInformation GetConnectionInformations();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public partial interface IRedisDatabase
/// </summary>
/// <param name="keys">The cache keys.</param>
/// <param name="flag">Behaviour markers associated with a given command</param>
Task RemoveAllAsync(IEnumerable<string> keys, CommandFlags flag = CommandFlags.None);
/// <returns>The numnber of items removed.</returns>
Task<long> RemoveAllAsync(IEnumerable<string> keys, CommandFlags flag = CommandFlags.None);

/// <summary>
/// Get the object with the specified key from Redis database
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using Microsoft.Extensions.Logging;
using StackExchange.Redis.Extensions.Core.Abstractions;
using StackExchange.Redis.Extensions.Core.Configuration;
using StackExchange.Redis.Extensions.Core.Models;

namespace StackExchange.Redis.Extensions.Core.Implementations
{
Expand All @@ -11,28 +13,36 @@ public class RedisCacheConnectionPoolManager : IRedisCacheConnectionPoolManager
{
private readonly ConcurrentBag<Lazy<StateAwareConnection>> connections;
private readonly RedisConfiguration redisConfiguration;
private readonly ILogger<RedisCacheConnectionPoolManager> logger;

/// <summary>
/// Initializes a new instance of the <see cref="RedisCacheConnectionPoolManager"/> class.
/// </summary>
/// <param name="redisConfiguration">The redis configuration</param>
public RedisCacheConnectionPoolManager(RedisConfiguration redisConfiguration)
/// <param name="redisConfiguration">The redis configuration.</param>
/// <param name="logger">The logger.</param>
public RedisCacheConnectionPoolManager(RedisConfiguration redisConfiguration, ILogger<RedisCacheConnectionPoolManager> logger)
{
this.redisConfiguration = redisConfiguration ?? throw new ArgumentNullException(nameof(redisConfiguration));

this.connections = new ConcurrentBag<Lazy<StateAwareConnection>>();
this.logger = logger;
}

/// <inheritdoc/>
public void Dispose()
{
var activeConnections = this.connections.Where(lazy => lazy.IsValueCreated).ToList();

logger.LogDebug("Disposing {0} active connections.", activeConnections.Count);

for (var i = 0; i < activeConnections.Count; i++)
activeConnections[i].Value.Invalidate();

while (this.connections.IsEmpty == false)
{
logger.LogDebug("Removing invalid connections from pool.");
this.connections.TryTake(out var taken);
}
}

/// <inheritdoc/>
Expand All @@ -48,16 +58,51 @@ public IConnectionMultiplexer GetConnection()
return (ConnectionMultiplexer)this.connections.First(lazy => !lazy.IsValueCreated).Value;
}

/// <inheritdoc/>
public ConnectionPoolInformation GetConnectionInformations()
{
var activeConnections = 0;
var invalidConnections = 0;
var readyNotUsedYet = 0;

foreach (var lazy in connections)
{
if (!lazy.IsValueCreated)
{
readyNotUsedYet++;
continue;
}

if (!lazy.Value.IsConnected())
{
invalidConnections++;
continue;
}

activeConnections++;
}

return new ConnectionPoolInformation()
{
RequiredPoolSize = redisConfiguration.PoolSize,
ActiveConnections = activeConnections,
InvalidConnections = invalidConnections,
ReadyNotUsedYet = readyNotUsedYet
};
}

private void EmitConnection()
{
this.connections.Add(new Lazy<StateAwareConnection>(() =>
{
this.logger.LogDebug("Creating new Redis connection.");
var multiplexer = ConnectionMultiplexer.Connect(redisConfiguration.ConfigurationOptions);
if (this.redisConfiguration.ProfilingSessionProvider != null)
multiplexer.RegisterProfiler(this.redisConfiguration.ProfilingSessionProvider);
return new StateAwareConnection(multiplexer, this.EmitConnection);
return new StateAwareConnection(multiplexer, this.EmitConnection, logger);
}));
}

Expand All @@ -72,14 +117,21 @@ private void EmitConnections()
var requiredNumOfConnections = poolSize - invalidOrDisconnectedConnections;

if (invalidOrDisconnectedConnections <= 0 && this.connections.Count > 0)
{
logger.LogDebug("The pool is created and there aren't any invalid connections.");
return;
}

logger.LogDebug("The pool size is {0} and it requires new {1} connections.", poolSize, requiredNumOfConnections);

for (var i = 0; i < requiredNumOfConnections; i++)
this.EmitConnection();
}

private void InvalidateDisconnectedConnections()
{
logger.LogDebug("Checking if there are any invalid connections...");

foreach (var lazy in connections)
{
if (lazy.IsValueCreated && !lazy.Value.IsConnected())
Expand All @@ -96,6 +148,7 @@ internal sealed class StateAwareConnection
{
private readonly Action invalidateConnectionCallback;
private readonly ConnectionMultiplexer multiplexer;
private readonly ILogger logger;
private bool invalidated;

/// <summary>
Expand All @@ -105,12 +158,14 @@ internal sealed class StateAwareConnection
/// <param name="connectionInvalidatedCallback">
/// A delegate representing a method that will be called when the give the connection became invalid.
/// </param>
public StateAwareConnection(ConnectionMultiplexer multiplexer, Action connectionInvalidatedCallback)
/// <param name="logger">The logger.</param>
public StateAwareConnection(ConnectionMultiplexer multiplexer, Action connectionInvalidatedCallback, ILogger logger)
{
this.multiplexer = multiplexer ?? throw new ArgumentNullException(nameof(multiplexer));
this.invalidateConnectionCallback = connectionInvalidatedCallback ?? throw new ArgumentNullException(nameof(connectionInvalidatedCallback));

this.multiplexer.ConnectionFailed += this.ConnectionFailed;
this.logger = logger;
}

public static implicit operator ConnectionMultiplexer(StateAwareConnection c) => c.multiplexer;
Expand All @@ -119,6 +174,8 @@ public StateAwareConnection(ConnectionMultiplexer multiplexer, Action connection

public void Invalidate()
{
logger.LogWarning("Invalidating redis connection...");

if (this.invalidated)
return;

Expand All @@ -141,6 +198,8 @@ private void ConnectionFailed(object sender, ConnectionFailedEventArgs e)
case ConnectionFailureType.SocketFailure:
case ConnectionFailureType.UnableToConnect:
{
logger.LogError(e.Exception, "Redis connection error {0}.", e.FailureType);

this.Invalidate();
this.invalidateConnectionCallback();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,35 @@

namespace StackExchange.Redis.Extensions.Core.Implementations
{
internal partial class RedisDatabase : IRedisDatabase
public partial class RedisDatabase : IRedisDatabase
{
/// <inheritdoc/>
public Task<bool> HashDeleteAsync(string hashKey, string key, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashDeleteAsync(hashKey, key, commandFlags);
}

/// <inheritdoc/>
public Task<long> HashDeleteAsync(string hashKey, IEnumerable<string> keys, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashDeleteAsync(hashKey, keys.Select(x => (RedisValue)x).ToArray(), commandFlags);
}

/// <inheritdoc/>
public Task<bool> HashExistsAsync(string hashKey, string key, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashExistsAsync(hashKey, key, commandFlags);
}

/// <inheritdoc/>
public async Task<T> HashGetAsync<T>(string hashKey, string key, CommandFlags commandFlags = CommandFlags.None)
{
var redisValue = await Database.HashGetAsync(hashKey, key, commandFlags).ConfigureAwait(false);

return redisValue.HasValue ? Serializer.Deserialize<T>(redisValue) : default;
}

/// <inheritdoc/>
public async Task<Dictionary<string, T>> HashGetAsync<T>(string hashKey, IList<string> keys, CommandFlags commandFlags = CommandFlags.None)
{
var tasks = new Task<T>[keys.Count];
Expand All @@ -48,6 +53,7 @@ public async Task<Dictionary<string, T>> HashGetAsync<T>(string hashKey, IList<s
return result;
}

/// <inheritdoc/>
public async Task<Dictionary<string, T>> HashGetAllAsync<T>(string hashKey, CommandFlags commandFlags = CommandFlags.None)
{
return (await Database.HashGetAllAsync(hashKey, commandFlags).ConfigureAwait(false))
Expand All @@ -57,43 +63,51 @@ public async Task<Dictionary<string, T>> HashGetAllAsync<T>(string hashKey, Comm
StringComparer.Ordinal);
}

/// <inheritdoc/>
public Task<long> HashIncerementByAsync(string hashKey, string key, long value, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashIncrementAsync(hashKey, key, value, commandFlags);
}

/// <inheritdoc/>
public Task<double> HashIncerementByAsync(string hashKey, string key, double value, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashIncrementAsync(hashKey, key, value, commandFlags);
}

/// <inheritdoc/>
public async Task<IEnumerable<string>> HashKeysAsync(string hashKey, CommandFlags commandFlags = CommandFlags.None)
{
return (await Database.HashKeysAsync(hashKey, commandFlags).ConfigureAwait(false)).Select(x => x.ToString());
}

/// <inheritdoc/>
public Task<long> HashLengthAsync(string hashKey, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashLengthAsync(hashKey, commandFlags);
}

/// <inheritdoc/>
public Task<bool> HashSetAsync<T>(string hashKey, string key, T value, bool nx = false, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashSetAsync(hashKey, key, Serializer.Serialize(value), nx ? When.NotExists : When.Always, commandFlags);
}

/// <inheritdoc/>
public Task HashSetAsync<T>(string hashKey, IDictionary<string, T> values, CommandFlags commandFlags = CommandFlags.None)
{
var entries = values.Select(kv => new HashEntry(kv.Key, Serializer.Serialize(kv.Value)));

return Database.HashSetAsync(hashKey, entries.ToArray(), commandFlags);
}

/// <inheritdoc/>
public async Task<IEnumerable<T>> HashValuesAsync<T>(string hashKey, CommandFlags commandFlags = CommandFlags.None)
{
return (await Database.HashValuesAsync(hashKey, commandFlags).ConfigureAwait(false)).Select(x => Serializer.Deserialize<T>(x));
}

/// <inheritdoc/>
public Dictionary<string, T> HashScan<T>(string hashKey, string pattern, int pageSize = 10, CommandFlags commandFlags = CommandFlags.None)
{
return Database.HashScan(hashKey, pattern, pageSize, commandFlags).ToDictionary(x => x.Name.ToString(), x => Serializer.Deserialize<T>(x.Value), StringComparer.Ordinal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

namespace StackExchange.Redis.Extensions.Core.Implementations
{
internal partial class RedisDatabase : IRedisDatabase
public partial class RedisDatabase : IRedisDatabase
{
/// <inheritdoc/>
public Task<long> ListAddToLeftAsync<T>(string key, T item, When when = When.Always, CommandFlags flags = CommandFlags.None)
where T : class
{
Expand All @@ -20,6 +21,7 @@ public Task<long> ListAddToLeftAsync<T>(string key, T item, When when = When.Alw
return Database.ListLeftPushAsync(key, serializedItem, when, flags);
}

/// <inheritdoc/>
public async Task<T> ListGetFromRightAsync<T>(string key, CommandFlags flags = CommandFlags.None)
where T : class
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@

namespace StackExchange.Redis.Extensions.Core.Implementations
{
internal partial class RedisDatabase : IRedisDatabase
public partial class RedisDatabase : IRedisDatabase
{
/// <inheritdoc/>
public Task<long> PublishAsync<T>(RedisChannel channel, T message, CommandFlags flags = CommandFlags.None)
{
var sub = connectionMultiplexer.GetSubscriber();
return sub.PublishAsync(channel, Serializer.Serialize(message), flags);
}

/// <inheritdoc/>
public Task SubscribeAsync<T>(RedisChannel channel, Func<T, Task> handler, CommandFlags flags = CommandFlags.None)
{
if (handler == null)
Expand All @@ -23,6 +25,7 @@ public Task SubscribeAsync<T>(RedisChannel channel, Func<T, Task> handler, Comma
return sub.SubscribeAsync(channel, async (redisChannel, value) => await handler(Serializer.Deserialize<T>(value)).ConfigureAwait(false), flags);
}

/// <inheritdoc/>
public Task UnsubscribeAsync<T>(RedisChannel channel, Func<T, Task> handler, CommandFlags flags = CommandFlags.None)
{
if (handler == null)
Expand All @@ -32,12 +35,14 @@ public Task UnsubscribeAsync<T>(RedisChannel channel, Func<T, Task> handler, Com
return sub.UnsubscribeAsync(channel, (redisChannel, value) => handler(Serializer.Deserialize<T>(value)), flags);
}

/// <inheritdoc/>
public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
{
var sub = connectionMultiplexer.GetSubscriber();
return sub.UnsubscribeAllAsync(flags);
}

/// <inheritdoc/>
public async Task<bool> UpdateExpiryAsync(string key, DateTimeOffset expiresAt, CommandFlags flags = CommandFlags.None)
{
if (await Database.KeyExistsAsync(key).ConfigureAwait(false))
Expand All @@ -46,6 +51,7 @@ public async Task<bool> UpdateExpiryAsync(string key, DateTimeOffset expiresAt,
return false;
}

/// <inheritdoc/>
public async Task<bool> UpdateExpiryAsync(string key, TimeSpan expiresIn, CommandFlags flags = CommandFlags.None)
{
if (await Database.KeyExistsAsync(key).ConfigureAwait(false))
Expand All @@ -54,6 +60,7 @@ public async Task<bool> UpdateExpiryAsync(string key, TimeSpan expiresIn, Comman
return false;
}

/// <inheritdoc/>
public async Task<IDictionary<string, bool>> UpdateExpiryAllAsync(string[] keys, DateTimeOffset expiresAt, CommandFlags flags = CommandFlags.None)
{
var tasks = new Task<bool>[keys.Length];
Expand All @@ -71,6 +78,7 @@ public async Task<IDictionary<string, bool>> UpdateExpiryAllAsync(string[] keys,
return results;
}

/// <inheritdoc/>
public async Task<IDictionary<string, bool>> UpdateExpiryAllAsync(string[] keys, TimeSpan expiresIn, CommandFlags flags = CommandFlags.None)
{
var tasks = new Task<bool>[keys.Length];
Expand Down
Loading

0 comments on commit 9c5a2bf

Please sign in to comment.