Skip to content

ConnectionMultiplexer.Subscription is not Thread-safe #2763

Closed
@Chuck-EP

Description

@Chuck-EP

Version: 2.8.0 (Latest Stable)

Problem

The documentation states that ConnectionMultiplexer is fully thread-safe, but its Subscription internal class is not, causing problems in some operations such as Subscribe, SubscribeAsync, Unsubscribe, UnsubscribeAsync when called with a handler.

The issue stems from the Add(...) and Remove(...) functions, while delegates are immutable and cannot be corrupted, the += and -= operations are not atomic and not thread-safe. When a delegate updates itself, it creates a copy of its callback list, modifies it and then update its internal reference, if called from multiple threads some callbacks may be incorrectly lost or kept.

Example

I've made a very simple example that only calls Subscribe & Unsubscribe in a multi-threaded way and uses reflection to inspect Subscription._handlers to show that it incorrectly lose or keep callbacks that it should not.

(Please note that I've also tested both Async version of the same functions and had similar results)

using System.Reflection;
using StackExchange.Redis;

public static class Program
{
    public static async Task Main(string[] args)
    {
        ConnectionMultiplexer pubsub = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
        ISubscriber subscriber = pubsub.GetSubscriber();

        const int COUNT = 1000;
        const string CHANNEL = "CHANNEL:TEST";

        void Handler(RedisChannel channel, RedisValue value) 
        {
            Console.WriteLine(value);
        };
        
        List<Action> subscribes = new List<Action>(COUNT);
        for (int i = 0; i < COUNT; i++)
            subscribes.Add(() => subscriber.Subscribe(CHANNEL, Handler));
        Parallel.ForEach(subscribes, (action) => { action(); });

        PrintInternalSubscribers(subscriber, CHANNEL, COUNT);

        List<Action> unsubscribes = new List<Action>(COUNT);
        for (int i = 0; i < COUNT; i++)
            unsubscribes.Add(() => subscriber.Unsubscribe(CHANNEL, Handler));
        Parallel.ForEach(unsubscribes, (action) => { action(); });

        PrintInternalSubscribers(subscriber, CHANNEL, 0);
    }

    private static void PrintInternalSubscribers(ISubscriber subscriber, string channel, int expected)
    {
        MethodInfo methodInfo = subscriber.Multiplexer.GetType().GetMethod("TryGetSubscription", BindingFlags.NonPublic | BindingFlags.Instance);
        object[] parameters = new object[] { RedisChannel.Literal(channel) , null };
        methodInfo.Invoke(subscriber.Multiplexer, parameters);
        object subscription = parameters[1];
        int count = 0;
        if(subscription != null)
        {
            FieldInfo fieldInfo = subscription.GetType().GetField("_handlers", BindingFlags.NonPublic | BindingFlags.Instance);
            Action<RedisChannel, RedisValue> handlers = fieldInfo.GetValue(subscription) as Action<RedisChannel, RedisValue>;
            count = handlers != null ? handlers.GetInvocationList().Length : 0;
        }
        Console.WriteLine($"InvocationList Actual: {count} Expected: {expected}");
    }
}

Here's the output of a typical run:
image

Here's the output of the same program after changing Parallel.ForEach for a regular foreach making it single threaded:
image

Fix ?

A simple solution could be to add a lock around those operations. Jon Skeet has a very good article about delegate & thread-safety

I've created a PR #2769 that does exactly this and has unit tests.

Related

The following github issues are most likely related: #2115 #2458

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions