Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge v0.23.0 #164

Merged
merged 17 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ jobs:
dotnet-version: '7.0.x'
- run: dotnet --info
- name: Build solution and run all tests
run: ./build.sh
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
run: ./build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.5" />
<PackageReference Include="BenchmarkDotNet" Version="0.13.6" />
<PackageReference Include="CacheManager.Microsoft.Extensions.Caching.Memory" Version="1.2.0" />
<PackageReference Include="CacheTower" Version="0.13.0" />
<PackageReference Include="EasyCaching.InMemory" Version="1.9.0" />
Expand Down
13 changes: 8 additions & 5 deletions docs/Backplane.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,27 @@ One final thing to notice is that FusionCache automatically differentiates betwe

## ↩️ Auto-Recovery

Since the backplane is implemented on top of a distributed component (in general some sort of message bus, like the Redis Pub/Sub feature) sometimes things can go bad: the message bus can restart or become temporarily unavailable, transient network errors may occur or anything else. In those situations each local nodes' memory caches will become out of sync, since they missed some notifications.
Since the backplane is implemented on top of a distributed component (in general some sort of message bus, like the Redis Pub/Sub feature) sometimes things can go bad: the message bus can restart or become temporarily unavailable, transient network errors may occur or anything else.

In those situations each nodes' local memory caches will become out of sync, since they would've missed some notifications.

Wouldn't it be nice if FusionCache would help us is some way?

Enter **auto-recovery**.

With auto-recovery enabled FusionCache will detect notifications that failed to be sent, put them in a local temporary queue and later on, as soon as the backplane will become available again, will try to send them to all the other nodes to re-sync them correctly.
With auto-recovery FusionCache will detect notifications that failed to be sent, put them in a local temporary queue and when later on the backplane will become available again, it will try to send them to all the other nodes, to re-sync them correctly.

Special care has been put into correctly handling some common situations, like:
- if more than one notification is about to be queued for the same cache key, only the last one will be kept since the result of sending 2 notifications for the same cache key back-to-back would be the same
- if a notification is received for a cache key for which there is a queued notification, only the most recent one is kept: if the incoming one is newer, the local one is discarded and the incoming one is processed, otherwise the incoming one is ignored and the local one is sent to the other nodes. This avoids, for example, evicting an entry from a local cache if it has been updated after a change in a remote node, which would be useless
- it is possible to set a limit in how many notifications to keep in the queue via the `BackplaneAutoRecoveryMaxItems` option (default value: `100`, can be `null` to remove any limit) to avoid consuming too much memory or to bombard the backplane as soon as it will become available again. If a notification is about to be queued but the limit has already been reached, an heuristic is used that will remove the notification for the cache entry that will expire sooner (calculated as: instant when the notification has been created + cache entry's `Duration`), to limit as much as possible the impact on the global shared state synchronization.
- it is possible to set a limit in how many notifications to keep in the queue via the `BackplaneAutoRecoveryMaxItems` option to avoid consuming too much memory as it will become available again (default value: `null` which means no limits). If a notification is about to be queued but the limit has already been reached, an heuristic is used to remove the notification for the cache entry that will expire sooner (calculated as: instant when the notification has been created + cache entry's `Duration`), to limit as much as possible the impact on the global shared state synchronization
- when a backplane becomes available again, a little amount of time is awaited to avoid small sync issues, to better handle backpressure in an automatic way (configurable via the `BackplaneAutoRecoveryReconnectDelay` option)
- when sending a pending backplane notification from the auto-recovery queue, it is possible to also expire the value on the distributed cache in case the underlying server/service is actually the same (eg: Redis), since when the backplane notification failed it probably also failed the saving of the data in the distributed cache. This can be enabled/disabled via the `EnableDistributedExpireOnBackplaneAutoRecovery` option

This feature is not implemented **inside** a backplane implementation, of which there are multiple, but inside FusionCache itself: this means that it works with any backplane implementation, which is nice.
This feature is not implemented **inside** a specific backplane implementation, of which there are multiple, but inside FusionCache itself: this means that it works with any backplane implementation, which is nice.

**ℹ NOTE:** auto-recovery is available since version `0.14.0`, but it's enabled by default only since version `0.17.0`.


## 📦 Packages

Currently there are 2 official packages we can use:
Expand Down
6 changes: 4 additions & 2 deletions docs/Options.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ In general this can be used as a set of options that will act as the *baseline*,
| `DistributedCacheKeyModifierMode` | `CacheKeyModifierMode` | `Prefix` | Specify the mode in which cache key will be changed for the distributed cache (eg: to specify the wire format version). |
| `BackplaneCircuitBreakerDuration` | `TimeSpan` | `none` | The duration of the circuit-breaker used when working with the backplane. |
| `BackplaneChannelPrefix` | `string?` | `null` | The prefix to use in the backplane channel name: if not specified the `CacheName` will be used. |
| `EnableBackplaneAutoRecovery` | `bool` | `false` | Enable auto-recovery for the backplane notifications to better handle transient errors without generating synchronization issues: notifications that failed to be sent out will be retried later on, when the backplane becomes responsive again. |
| `BackplaneAutoRecoveryMaxItems` | `int?` | `100` | The maximum number of items in the auto-recovery queue: this can help reducing memory consumption. If set to `null` there will be no limit. |
| `EnableBackplaneAutoRecovery` | `bool` | `true` | Enable auto-recovery for the backplane notifications to better handle transient errors without generating synchronization issues: notifications that failed to be sent out will be retried later on, when the backplane becomes responsive again. |
| `BackplaneAutoRecoveryMaxItems` | `int?` | `null` | The maximum number of items in the auto-recovery queue: this can help reducing memory consumption. If set to `null` there will be no limit. |
| `BackplaneAutoRecoveryReconnectDelay` | `TimeSpan` | `2s` | The amount of time to wait, after a backplane reconnection, before trying to process the auto-recovery queue: this may be useful to allow all the other nodes to be ready. |
| `EnableDistributedExpireOnBackplaneAutoRecovery`| `bool` | `true` | Enable expiring a cache entry, only on the distributed cache (if any), when anauto-recovery message is being published on the backplane, to ensure that the value in the distributed cache will not be stale. |
| `EnableSyncEventHandlersExecution` | `bool` | `false` | If set to `true` all registered event handlers will be run synchronously: this is really, very, highly discouraged as it may slow down all other handlers and FusionCache itself. |
| `IncoherentOptionsNormalizationLogLevel` | `LogLevel` | `Warning` | Used when some options have incoherent values that have been fixed with a normalization, like for example when a FailSafeMaxDuration is lower than a Duration, so the Duration is used instead. |
| `SerializationErrorsLogLevel` | `LogLevel` | `Error` | Used when logging serialization errors (while working with the distributed cache). |
Expand Down
107 changes: 81 additions & 26 deletions src/ZiggyCreatures.FusionCache.Backplane.Memory/MemoryBackplane.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@
namespace ZiggyCreatures.Caching.Fusion.Backplane.Memory;

/// <summary>
/// An-in memory implementation of a FusionCache backplane
/// An in-memory implementation of a FusionCache backplane
/// </summary>
public class MemoryBackplane
: IFusionCacheBackplane
{
private static readonly ConcurrentDictionary<string, List<MemoryBackplane>> _channels = new ConcurrentDictionary<string, List<MemoryBackplane>>();
private static readonly ConcurrentDictionary<string, ConcurrentDictionary<string, List<MemoryBackplane>>> _connections = new ConcurrentDictionary<string, ConcurrentDictionary<string, List<MemoryBackplane>>>();

private readonly MemoryBackplaneOptions _options;
private BackplaneSubscriptionOptions? _subscriptionOptions;
private readonly ILogger? _logger;

private ConcurrentDictionary<string, List<MemoryBackplane>>? _connection;

private string _channelName = "FusionCache.Notifications";
private Action<BackplaneMessage>? _handler;
private List<MemoryBackplane>? _backplanes;
private List<MemoryBackplane>? _subscriber;

private Action<BackplaneConnectionInfo>? _connectHandler;
private Action<BackplaneMessage>? _incomingMessageHandler;

/// <summary>
/// Initializes a new instance of the MemoryBackplane class.
Expand All @@ -37,6 +42,7 @@ public MemoryBackplane(IOptions<MemoryBackplaneOptions> optionsAccessor, ILogger
// OPTIONS
_options = optionsAccessor.Value ?? throw new ArgumentNullException(nameof(optionsAccessor.Value));


// LOGGING
if (logger is NullLogger<MemoryBackplane>)
{
Expand All @@ -47,6 +53,43 @@ public MemoryBackplane(IOptions<MemoryBackplaneOptions> optionsAccessor, ILogger
{
_logger = logger;
}

// CONNECTION
if (_options.ConnectionId is null)
{
if (_logger?.IsEnabled(LogLevel.Warning) ?? false)
_logger.Log(LogLevel.Warning, "FUSION: A MemoryBackplane should be used with an explicit ConnectionId option, otherwise concurrency issues will probably happen");

_options.ConnectionId = "_default";
}
}

private void EnsureConnection()
{
if (_options.ConnectionId is null)
throw new NullReferenceException("The ConnectionId is null");

_connection = _connections.GetOrAdd(_options.ConnectionId, _ => new ConcurrentDictionary<string, List<MemoryBackplane>>());

_connectHandler?.Invoke(new BackplaneConnectionInfo(false));

EnsureSubscriber();
}

private void EnsureSubscriber()
{
if (_subscriber is null && _connection is not null)
_subscriber = _connection.GetOrAdd(_channelName, _ => new List<MemoryBackplane>());
}

private void Disconnect()
{
_connectHandler = null;

if (_connection is null)
return;

_connection = null;
}

/// <inheritdoc/>
Expand All @@ -56,62 +99,74 @@ public void Subscribe(BackplaneSubscriptionOptions subscriptionOptions)
throw new ArgumentNullException(nameof(subscriptionOptions));

if (subscriptionOptions.ChannelName is null)
throw new NullReferenceException("The ChannelName cannot be null");
throw new NullReferenceException("The BackplaneSubscriptionOptions.ChannelName cannot be null");

if (subscriptionOptions.IncomingMessageHandler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.MessageHandler cannot be null");

if (subscriptionOptions.Handler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.Handler cannot be null");
if (subscriptionOptions.ConnectHandler is null)
throw new NullReferenceException("The BackplaneSubscriptionOptions.ConnectHandler cannot be null");

_subscriptionOptions = subscriptionOptions;

_channelName = _subscriptionOptions.ChannelName;
_handler = _subscriptionOptions.Handler;

_backplanes = _channels.GetOrAdd(_channelName, _ => new List<MemoryBackplane>());
_incomingMessageHandler = _subscriptionOptions.IncomingMessageHandler;
_connectHandler = _subscriptionOptions.ConnectHandler;

if (_backplanes is null)
return;
// CONNECTION
EnsureConnection();

if (_subscriber is null)
throw new NullReferenceException("The subscriber is null");

lock (_backplanes)
lock (_subscriber)
{
_backplanes.Add(this);
_subscriber.Add(this);
}
}

/// <inheritdoc/>
public void Unsubscribe()
{
if (_backplanes is null)
return;

_handler = null;

lock (_backplanes)
if (_subscriber is not null)
{
_backplanes.Remove(this);
_incomingMessageHandler = null;
lock (_subscriber)
{
_subscriber.Remove(this);
_subscriptionOptions = null;
}
}

Disconnect();
}

/// <inheritdoc/>
public ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token)
public ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
Publish(message, options);
Publish(message, options, token);
return new ValueTask();
}

/// <inheritdoc/>
public void Publish(BackplaneMessage message, FusionCacheEntryOptions options)
public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
{
EnsureConnection();

if (message is null)
throw new ArgumentNullException(nameof(message));

if (message.IsValid() == false)
throw new InvalidOperationException("The message is invalid");

if (_backplanes is null)
if (_subscriber is null)
throw new NullReferenceException("Something went wrong :-|");

foreach (var backplane in _backplanes)
foreach (var backplane in _subscriber)
{
token.ThrowIfCancellationRequested();

if (backplane == this)
continue;

Expand All @@ -121,6 +176,6 @@ public void Publish(BackplaneMessage message, FusionCacheEntryOptions options)

internal void OnMessage(BackplaneMessage message)
{
_handler?.Invoke(message);
_incomingMessageHandler?.Invoke(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ namespace ZiggyCreatures.Caching.Fusion.Backplane.Memory;
public class MemoryBackplaneOptions
: IOptions<MemoryBackplaneOptions>
{
/// <summary>
/// The logical id used to simulate a connection to a server.
/// <br/>
/// It is used to group together multiple instances of a MemoryBackplane and separate them from others, without interfering with other backplanes running concurrently at the same time (mostly useful for testing).
/// <br/>
/// Basically it's like a connection string.
/// </summary>
public string? ConnectionId { get; set; }

MemoryBackplaneOptions IOptions<MemoryBackplaneOptions>.Value
{
get { return this; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<Version>0.22.0</Version>
<Version>0.23.0</Version>
<PackageId>ZiggyCreatures.FusionCache.Backplane.Memory</PackageId>
<PackageIcon>logo-128x128.png</PackageIcon>
<Description>FusionCache in memory backplane, used for testing</Description>
Expand All @@ -13,6 +13,7 @@
<DocumentationFile>ZiggyCreatures.FusionCache.Backplane.Memory.xml</DocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
<PackageReleaseNotes>
- Added support for ConnectionId (better for concurrent tests scenarios)
- Better log messages
- Dependencies update
</PackageReleaseNotes>
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading