Skip to content

Commit 4a97489

Browse files
authored
Merge pull request #1223 from AArnott/asyncMutex
Add `AsyncMutex` class
2 parents fcfe71f + 3078cca commit 4a97489

File tree

6 files changed

+461
-0
lines changed

6 files changed

+461
-0
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3+
4+
using System;
5+
using System.Collections.Concurrent;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Microsoft.VisualStudio.Threading;
10+
11+
/// <summary>
12+
/// A mutex that can be entered asynchronously.
13+
/// </summary>
14+
/// <remarks>
15+
/// <para>
16+
/// This class utilizes the OS mutex synchronization primitive, which is fundamentally thread-affinitized and requires synchronously blocking the thread that will own the mutex.
17+
/// This makes a native mutex unsuitable for use in async methods, where the thread that enters the mutex may not be the same thread that exits it.
18+
/// This class solves that problem by using a private dedicated thread to enter and release the mutex, but otherwise allows its owner to execute async code, switch threads, etc.
19+
/// </para>
20+
/// </remarks>
21+
/// <example><![CDATA[
22+
/// using AsyncCrossProcessMutex mutex = new("Some-Unique Name");
23+
/// using (await mutex.EnterAsync())
24+
/// {
25+
/// // Code that must not execute in parallel with any other thread or process protected by the same named mutex.
26+
/// }
27+
/// ]]></example>
28+
public class AsyncCrossProcessMutex
29+
: IDisposable
30+
{
31+
#pragma warning disable SA1310 // Field names should not contain underscore
32+
private const int STATE_READY = 0;
33+
private const int STATE_HELD_OR_WAITING = 1;
34+
private const int STATE_DISPOSED = 2;
35+
#pragma warning restore SA1310 // Field names should not contain underscore
36+
37+
private static readonly Action ExitSentinel = new Action(() => { });
38+
private readonly Thread namedMutexOwner;
39+
private readonly BlockingCollection<Action> mutexWorkQueue = new();
40+
private readonly Mutex mutex;
41+
private int state;
42+
43+
/// <summary>
44+
/// Initializes a new instance of the <see cref="AsyncCrossProcessMutex"/> class.
45+
/// </summary>
46+
/// <param name="name">
47+
/// A non-empty name for the mutex, which follows standard mutex naming rules.
48+
/// This name will share a namespace with other processes in the system and collisions will result in the processes sharing a single mutex across processes.
49+
/// </param>
50+
/// <remarks>
51+
/// See the help docs on the underlying <see cref="Mutex"/> class for more information on the <paramref name="name"/> parameter.
52+
/// Consider when reading that the <c>initiallyOwned</c> parameter for that constructor is always <see langword="false"/> for this class.
53+
/// </remarks>
54+
public AsyncCrossProcessMutex(string name)
55+
{
56+
Requires.NotNullOrEmpty(name);
57+
this.namedMutexOwner = new Thread(this.MutexOwnerThread, 100 * 1024)
58+
{
59+
Name = $"{nameof(AsyncCrossProcessMutex)}-{name}",
60+
};
61+
this.mutex = new Mutex(false, name);
62+
this.namedMutexOwner.Start();
63+
this.Name = name;
64+
}
65+
66+
/// <summary>
67+
/// Gets the name of the mutex.
68+
/// </summary>
69+
public string Name { get; }
70+
71+
/// <summary>
72+
/// Disposes of the underlying native objects.
73+
/// </summary>
74+
public void Dispose()
75+
{
76+
int priorState = Interlocked.Exchange(ref this.state, STATE_DISPOSED);
77+
if (priorState != STATE_DISPOSED)
78+
{
79+
this.mutexWorkQueue.Add(ExitSentinel);
80+
this.mutexWorkQueue.CompleteAdding();
81+
}
82+
}
83+
84+
/// <inheritdoc cref="EnterAsync(TimeSpan)"/>
85+
public Task<LockReleaser> EnterAsync() => this.EnterAsync(Timeout.InfiniteTimeSpan);
86+
87+
/// <summary>
88+
/// Acquires the mutex asynchronously.
89+
/// </summary>
90+
/// <param name="timeout">The maximum time to wait before timing out. Use <see cref="Timeout.InfiniteTimeSpan"/> for no timeout, or <see cref="TimeSpan.Zero"/> to acquire the mutex only if it is immediately available.</param>
91+
/// <returns>A value whose disposal will release the mutex.</returns>
92+
/// <exception cref="TimeoutException">Thrown from the awaited result if the mutex could not be acquired within the specified timeout.</exception>
93+
/// <exception cref="ArgumentOutOfRangeException">Thrown from the awaited result if the <paramref name="timeout"/> is a negative number other than -1 milliseconds, which represents an infinite timeout.</exception>
94+
/// <exception cref="InvalidOperationException">Thrown if called before a prior call to this method has completed, with its releaser disposed if the mutex was entered.</exception>
95+
public async Task<LockReleaser> EnterAsync(TimeSpan timeout) => await this.TryEnterAsync(timeout) ?? throw new TimeoutException();
96+
97+
/// <summary>
98+
/// Acquires the mutex asynchronously, allowing for timeouts without throwing exceptions.
99+
/// </summary>
100+
/// <param name="timeout">The maximum time to wait before timing out. Use <see cref="Timeout.InfiniteTimeSpan"/> for no timeout, or <see cref="TimeSpan.Zero"/> to acquire the mutex only if it is immediately available.</param>
101+
/// <returns>
102+
/// If the mutex was acquired, the result is a value whose disposal will release the mutex.
103+
/// In the event of a timeout, the result in a <see langword="null" /> value.
104+
/// </returns>
105+
/// <exception cref="ArgumentOutOfRangeException">Thrown from the awaited result if the <paramref name="timeout"/> is a negative number other than -1 milliseconds, which represents an infinite timeout.</exception>
106+
/// <exception cref="InvalidOperationException">Thrown if called before a prior call to this method has completed, with its releaser disposed if the mutex was entered.</exception>
107+
public Task<LockReleaser?> TryEnterAsync(TimeSpan timeout)
108+
{
109+
int priorState = Interlocked.CompareExchange(ref this.state, STATE_HELD_OR_WAITING, STATE_READY);
110+
switch (priorState)
111+
{
112+
case STATE_HELD_OR_WAITING:
113+
throw new InvalidOperationException();
114+
case STATE_DISPOSED:
115+
throw new ObjectDisposedException(this.GetType().FullName);
116+
}
117+
118+
// Pass `this` as the state simply to assist in debugging dumps.
119+
TaskCompletionSource<LockReleaser?> tcs = new(this, TaskCreationOptions.RunContinuationsAsynchronously);
120+
this.mutexWorkQueue.Add(delegate
121+
{
122+
try
123+
{
124+
if (this.mutex.WaitOne(timeout))
125+
{
126+
tcs.SetResult(new LockReleaser(this));
127+
}
128+
else
129+
{
130+
Assumes.True(Interlocked.CompareExchange(ref this.state, STATE_READY, STATE_HELD_OR_WAITING) == STATE_HELD_OR_WAITING);
131+
tcs.SetResult(null);
132+
}
133+
}
134+
catch (AbandonedMutexException)
135+
{
136+
tcs.SetResult(new LockReleaser(this, abandoned: true));
137+
}
138+
catch (Exception ex)
139+
{
140+
Assumes.True(Interlocked.CompareExchange(ref this.state, STATE_READY, STATE_HELD_OR_WAITING) == STATE_HELD_OR_WAITING);
141+
tcs.SetException(ex);
142+
}
143+
});
144+
145+
return tcs.Task;
146+
}
147+
148+
private void Release()
149+
{
150+
Assumes.True(Interlocked.CompareExchange(ref this.state, STATE_READY, STATE_HELD_OR_WAITING) == STATE_HELD_OR_WAITING);
151+
this.mutexWorkQueue.Add(this.mutex.ReleaseMutex);
152+
}
153+
154+
private void MutexOwnerThread()
155+
{
156+
try
157+
{
158+
while (!this.mutexWorkQueue.IsCompleted)
159+
{
160+
Action work = this.mutexWorkQueue.Take();
161+
if (work == ExitSentinel)
162+
{
163+
// We use an exit sentinel to avoid an exception having to be thrown and caught on disposal when we call Take() and CompleteAdding() is called.
164+
break;
165+
}
166+
167+
work();
168+
}
169+
}
170+
finally
171+
{
172+
this.mutex.Dispose();
173+
}
174+
}
175+
176+
/// <summary>
177+
/// The value returned from <see cref="EnterAsync(TimeSpan)"/> that must be disposed to release the mutex.
178+
/// </summary>
179+
public struct LockReleaser : IDisposable
180+
{
181+
private AsyncCrossProcessMutex? owner;
182+
183+
internal LockReleaser(AsyncCrossProcessMutex mutex, bool abandoned = false)
184+
{
185+
this.owner = mutex;
186+
this.IsAbandoned = abandoned;
187+
}
188+
189+
/// <summary>
190+
/// Gets a value indicating whether the mutex was abandoned by its previous owner.
191+
/// </summary>
192+
public bool IsAbandoned { get; }
193+
194+
/// <summary>
195+
/// Releases the named mutex.
196+
/// </summary>
197+
public void Dispose()
198+
{
199+
Interlocked.Exchange(ref this.owner, null)?.Release();
200+
}
201+
}
202+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex
2+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.AsyncCrossProcessMutex(string! name) -> void
3+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Dispose() -> void
4+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync() -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
5+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
6+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser
7+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.Dispose() -> void
8+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.IsAbandoned.get -> bool
9+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Name.get -> string!
10+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser?>!
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex
2+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.AsyncCrossProcessMutex(string! name) -> void
3+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Dispose() -> void
4+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync() -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
5+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
6+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser
7+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.Dispose() -> void
8+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.IsAbandoned.get -> bool
9+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Name.get -> string!
10+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser?>!
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex
2+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.AsyncCrossProcessMutex(string! name) -> void
3+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Dispose() -> void
4+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync() -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
5+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
6+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser
7+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.Dispose() -> void
8+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.IsAbandoned.get -> bool
9+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Name.get -> string!
10+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser?>!
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex
2+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.AsyncCrossProcessMutex(string! name) -> void
3+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Dispose() -> void
4+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync() -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
5+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser>!
6+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser
7+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.Dispose() -> void
8+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser.IsAbandoned.get -> bool
9+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.Name.get -> string!
10+
Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.Task<Microsoft.VisualStudio.Threading.AsyncCrossProcessMutex.LockReleaser?>!

0 commit comments

Comments
 (0)