Description
Version: 2.8.0 (Latest Stable)
Problem
The documentation states that ConnectionMultiplexer is fully thread-safe, but its Subscription
internal class is not, causing problems in some operations such as Subscribe
, SubscribeAsync
, Unsubscribe
, UnsubscribeAsync
when called with a handler
.
The issue stems from the Add(...)
and Remove(...)
functions, while delegates are immutable and cannot be corrupted, the +=
and -=
operations are not atomic and not thread-safe. When a delegate updates itself, it creates a copy of its callback list, modifies it and then update its internal reference, if called from multiple threads some callbacks may be incorrectly lost or kept.
Example
I've made a very simple example that only calls Subscribe
& Unsubscribe
in a multi-threaded way and uses reflection to inspect Subscription._handlers
to show that it incorrectly lose or keep callbacks that it should not.
(Please note that I've also tested both Async version of the same functions and had similar results)
using System.Reflection;
using StackExchange.Redis;
public static class Program
{
public static async Task Main(string[] args)
{
ConnectionMultiplexer pubsub = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
ISubscriber subscriber = pubsub.GetSubscriber();
const int COUNT = 1000;
const string CHANNEL = "CHANNEL:TEST";
void Handler(RedisChannel channel, RedisValue value)
{
Console.WriteLine(value);
};
List<Action> subscribes = new List<Action>(COUNT);
for (int i = 0; i < COUNT; i++)
subscribes.Add(() => subscriber.Subscribe(CHANNEL, Handler));
Parallel.ForEach(subscribes, (action) => { action(); });
PrintInternalSubscribers(subscriber, CHANNEL, COUNT);
List<Action> unsubscribes = new List<Action>(COUNT);
for (int i = 0; i < COUNT; i++)
unsubscribes.Add(() => subscriber.Unsubscribe(CHANNEL, Handler));
Parallel.ForEach(unsubscribes, (action) => { action(); });
PrintInternalSubscribers(subscriber, CHANNEL, 0);
}
private static void PrintInternalSubscribers(ISubscriber subscriber, string channel, int expected)
{
MethodInfo methodInfo = subscriber.Multiplexer.GetType().GetMethod("TryGetSubscription", BindingFlags.NonPublic | BindingFlags.Instance);
object[] parameters = new object[] { RedisChannel.Literal(channel) , null };
methodInfo.Invoke(subscriber.Multiplexer, parameters);
object subscription = parameters[1];
int count = 0;
if(subscription != null)
{
FieldInfo fieldInfo = subscription.GetType().GetField("_handlers", BindingFlags.NonPublic | BindingFlags.Instance);
Action<RedisChannel, RedisValue> handlers = fieldInfo.GetValue(subscription) as Action<RedisChannel, RedisValue>;
count = handlers != null ? handlers.GetInvocationList().Length : 0;
}
Console.WriteLine($"InvocationList Actual: {count} Expected: {expected}");
}
}
Here's the output of a typical run:
Here's the output of the same program after changing Parallel.ForEach
for a regular foreach
making it single threaded:
Fix ?
A simple solution could be to add a lock around those operations. Jon Skeet has a very good article about delegate & thread-safety
I've created a PR #2769 that does exactly this and has unit tests.
Related
The following github issues are most likely related: #2115 #2458