Skip to content

Commit fe40d17

Browse files
Chuck-EPNickCraver
andauthored
Fix #2763: ConnectionMultiplexer.Subscription is not Thread-safe (#2769)
## Issue #2763 ## Solution Simply added a lock around `_handlers` in `ConnectionMultiplexer.Subscription`, like I was suggesting in the issue. ## Unit Test I added one that does exactly what the example code in #2763 was doing & testing for. I used the other tests as template/guide, let me know if something isn't up to spec. --------- Co-authored-by: Nick Craver <nrcraver@gmail.com>
1 parent c0bb4eb commit fe40d17

File tree

3 files changed

+60
-3
lines changed

3 files changed

+60
-3
lines changed

docs/ReleaseNotes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Current package versions:
1010

1111
- Add support for hash field expiration (see [#2715](https://github.com/StackExchange/StackExchange.Redis/issues/2715)) ([#2716 by atakavci](https://github.com/StackExchange/StackExchange.Redis/pull/2716]))
1212
- Add support for `HSCAN NOVALUES` (see [#2721](https://github.com/StackExchange/StackExchange.Redis/issues/2721)) ([#2722 by atakavci](https://github.com/StackExchange/StackExchange.Redis/pull/2722))
13+
- Fix [#2763](https://github.com/StackExchange/StackExchange.Redis/issues/2763): Make ConnectionMultiplexer.Subscription thread-safe ([#2769 by Chuck-EP](https://github.com/StackExchange/StackExchange.Redis/pull/2769))
1314

1415
## 2.8.0
1516

src/StackExchange.Redis/RedisSubscriber.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ internal enum SubscriptionAction
159159
internal sealed class Subscription
160160
{
161161
private Action<RedisChannel, RedisValue>? _handlers;
162+
private readonly object _handlersLock = new object();
162163
private ChannelMessageQueue? _queues;
163164
private ServerEndPoint? CurrentServer;
164165
public CommandFlags Flags { get; }
@@ -206,7 +207,10 @@ public void Add(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueue?
206207
{
207208
if (handler != null)
208209
{
209-
_handlers += handler;
210+
lock (_handlersLock)
211+
{
212+
_handlers += handler;
213+
}
210214
}
211215
if (queue != null)
212216
{
@@ -218,7 +222,10 @@ public bool Remove(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueu
218222
{
219223
if (handler != null)
220224
{
221-
_handlers -= handler;
225+
lock (_handlersLock)
226+
{
227+
_handlers -= handler;
228+
}
222229
}
223230
if (queue != null)
224231
{
@@ -236,7 +243,10 @@ public bool Remove(Action<RedisChannel, RedisValue>? handler, ChannelMessageQueu
236243

237244
internal void MarkCompleted()
238245
{
239-
_handlers = null;
246+
lock (_handlersLock)
247+
{
248+
_handlers = null;
249+
}
240250
ChannelMessageQueue.MarkAllCompleted(ref _queues);
241251
}
242252

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Xunit;
5+
using Xunit.Abstractions;
6+
7+
namespace StackExchange.Redis.Tests.Issues
8+
{
9+
public class Issue2763Tests : TestBase
10+
{
11+
public Issue2763Tests(ITestOutputHelper output) : base(output) { }
12+
13+
[Fact]
14+
public void Execute()
15+
{
16+
using var conn = Create();
17+
var subscriber = conn.GetSubscriber();
18+
19+
static void Handler(RedisChannel c, RedisValue v) { }
20+
21+
const int COUNT = 1000;
22+
RedisChannel channel = RedisChannel.Literal("CHANNEL:TEST");
23+
24+
List<Action> subscribes = new List<Action>(COUNT);
25+
for (int i = 0; i < COUNT; i++)
26+
subscribes.Add(() => subscriber.Subscribe(channel, Handler));
27+
Parallel.ForEach(subscribes, action => action());
28+
29+
Assert.Equal(COUNT, CountSubscriptionsForChannel(subscriber, channel));
30+
31+
List<Action> unsubscribes = new List<Action>(COUNT);
32+
for (int i = 0; i < COUNT; i++)
33+
unsubscribes.Add(() => subscriber.Unsubscribe(channel, Handler));
34+
Parallel.ForEach(unsubscribes, action => action());
35+
36+
Assert.Equal(0, CountSubscriptionsForChannel(subscriber, channel));
37+
}
38+
39+
private static int CountSubscriptionsForChannel(ISubscriber subscriber, RedisChannel channel)
40+
{
41+
ConnectionMultiplexer connMultiplexer = (ConnectionMultiplexer)subscriber.Multiplexer;
42+
connMultiplexer.GetSubscriberCounts(channel, out int handlers, out int _);
43+
return handlers;
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)