diff --git a/src/Microsoft.VisualStudio.Threading/AsyncMutex.cs b/src/Microsoft.VisualStudio.Threading/AsyncMutex.cs new file mode 100644 index 000000000..2510d811b --- /dev/null +++ b/src/Microsoft.VisualStudio.Threading/AsyncMutex.cs @@ -0,0 +1,189 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.VisualStudio.Threading; + +/// +/// A mutex that can be entered asynchronously. +/// +/// +/// +/// This mutex utilizes an OS synchronization primitive which is fundamentally thread-affinitized and requires synchronously blocking the thread that will own the mutex. +/// This makes a native mutex unsuitable for use in async methods, where the thread that enters the mutex may not be the same thread that exits it. +/// This class solves that problem by using a private dedicated thread to enter and release the mutex, but otherwise allows its owner to execute async code, switch threads, etc. +/// +/// +public class AsyncMutex : IDisposable +{ +#pragma warning disable SA1310 // Field names should not contain underscore + private const int STATE_READY = 0; + private const int STATE_HELD_OR_WAITING = 1; + private const int STATE_DISPOSED = 2; +#pragma warning restore SA1310 // Field names should not contain underscore + + private static readonly Action ExitSentinel = new Action(() => { }); + private readonly Thread namedMutexOwner; + private readonly BlockingCollection mutexWorkQueue = new(); + private readonly Mutex mutex; + private int state; + + /// + /// Initializes a new instance of the class. + /// + /// + /// + /// See the help docs on the underlying class for more information on the parameter. + /// Consider when reading that the initiallyOwned parameter for that constructor is always for this class. + /// + public AsyncMutex(string? name) + { + this.namedMutexOwner = new Thread(this.MutexOwnerThread, 100 * 1024) + { + Name = $"{nameof(AsyncMutex)}-{name}", + }; + this.mutex = new Mutex(false, name); + this.namedMutexOwner.Start(); + this.Name = name; + } + + /// + /// Gets the name of the mutex. + /// + public string? Name { get; } + + /// + /// Disposes of the underlying native objects. + /// + public void Dispose() + { + int priorState = Interlocked.Exchange(ref this.state, STATE_DISPOSED); + if (priorState != STATE_DISPOSED) + { + this.mutexWorkQueue.Add(ExitSentinel); + this.mutexWorkQueue.CompleteAdding(); + } + } + + /// + public ValueTask EnterAsync() => this.EnterAsync(Timeout.InfiniteTimeSpan); + + /// + /// Acquires the mutex asynchronously. + /// + /// The maximum time to wait before timing out. Use for no timeout, or to acquire the mutex only if it is immediately available. + /// A value whose disposal will release the mutex. + /// Thrown from the awaited result if the mutex could not be acquired within the specified timeout. + /// Thrown from the awaited result if the is a negative number other than -1 milliseconds, which represents an infinite timeout. + /// Thrown if called before a prior call to this method has completed, with its releaser disposed if the mutex was entered. + public async ValueTask EnterAsync(TimeSpan timeout) => await this.TryEnterAsync(timeout) ?? throw new TimeoutException(); + + /// + /// Acquires the mutex asynchronously, allowing for timeouts without throwing exceptions. + /// + /// The maximum time to wait before timing out. Use for no timeout, or to acquire the mutex only if it is immediately available. + /// + /// If the mutex was acquired, the result is a value whose disposal will release the mutex. + /// In the event of a timeout, the result in a value. + /// + /// Thrown from the awaited result if the is a negative number other than -1 milliseconds, which represents an infinite timeout. + /// Thrown if called before a prior call to this method has completed, with its releaser disposed if the mutex was entered. + public ValueTask TryEnterAsync(TimeSpan timeout) + { + int priorState = Interlocked.CompareExchange(ref this.state, STATE_HELD_OR_WAITING, STATE_READY); + switch (priorState) + { + case STATE_HELD_OR_WAITING: + throw new InvalidOperationException(); + case STATE_DISPOSED: + throw new ObjectDisposedException(this.GetType().FullName); + } + + TaskCompletionSource tcs = new(); + this.mutexWorkQueue.Add(delegate + { + try + { + if (this.mutex.WaitOne(timeout)) + { + tcs.SetResult(new LockReleaser(this)); + } + else + { + Assumes.True(Interlocked.CompareExchange(ref this.state, STATE_READY, STATE_HELD_OR_WAITING) == STATE_HELD_OR_WAITING); + tcs.SetResult(null); + } + } + catch (AbandonedMutexException) + { + tcs.SetResult(new LockReleaser(this, abandoned: true)); + } + catch (Exception ex) + { + Assumes.True(Interlocked.CompareExchange(ref this.state, STATE_READY, STATE_HELD_OR_WAITING) == STATE_HELD_OR_WAITING); + tcs.SetException(ex); + } + }); + + return new ValueTask(tcs.Task); + } + + private void Release() + { + Assumes.True(Interlocked.CompareExchange(ref this.state, STATE_READY, STATE_HELD_OR_WAITING) == STATE_HELD_OR_WAITING); + this.mutexWorkQueue.Add(this.mutex.ReleaseMutex); + } + + private void MutexOwnerThread() + { + try + { + while (!this.mutexWorkQueue.IsCompleted) + { + Action work = this.mutexWorkQueue.Take(); + if (work == ExitSentinel) + { + // We use an exit sentinel to avoid an exception having to be thrown and caught on disposal when we call Take() and CompleteAdding() is called. + break; + } + + work(); + } + } + finally + { + this.mutex.Dispose(); + } + } + + /// + /// The value returned from that must be disposed to release the mutex. + /// + public struct LockReleaser : IDisposable + { + private AsyncMutex? owner; + + internal LockReleaser(AsyncMutex mutex, bool abandoned = false) + { + this.owner = mutex; + this.IsAbandoned = abandoned; + } + + /// + /// Gets a value indicating whether the mutex was abandoned by its previous owner. + /// + public bool IsAbandoned { get; } + + /// + /// Releases the named mutex. + /// + public void Dispose() + { + Interlocked.Exchange(ref this.owner, null)?.Release(); + } + } +} diff --git a/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt index e69de29bb..a6a7470f3 100644 --- a/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/net472/PublicAPI.Unshipped.txt @@ -0,0 +1,10 @@ +Microsoft.VisualStudio.Threading.AsyncMutex +Microsoft.VisualStudio.Threading.AsyncMutex.AsyncMutex(string? name) -> void +Microsoft.VisualStudio.Threading.AsyncMutex.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync() -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.IsAbandoned.get -> bool +Microsoft.VisualStudio.Threading.AsyncMutex.Name.get -> string? +Microsoft.VisualStudio.Threading.AsyncMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt index e69de29bb..a6a7470f3 100644 --- a/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/net6.0-windows/PublicAPI.Unshipped.txt @@ -0,0 +1,10 @@ +Microsoft.VisualStudio.Threading.AsyncMutex +Microsoft.VisualStudio.Threading.AsyncMutex.AsyncMutex(string? name) -> void +Microsoft.VisualStudio.Threading.AsyncMutex.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync() -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.IsAbandoned.get -> bool +Microsoft.VisualStudio.Threading.AsyncMutex.Name.get -> string? +Microsoft.VisualStudio.Threading.AsyncMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt index e69de29bb..a6a7470f3 100644 --- a/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/net6.0/PublicAPI.Unshipped.txt @@ -0,0 +1,10 @@ +Microsoft.VisualStudio.Threading.AsyncMutex +Microsoft.VisualStudio.Threading.AsyncMutex.AsyncMutex(string? name) -> void +Microsoft.VisualStudio.Threading.AsyncMutex.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync() -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.IsAbandoned.get -> bool +Microsoft.VisualStudio.Threading.AsyncMutex.Name.get -> string? +Microsoft.VisualStudio.Threading.AsyncMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt b/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt index e69de29bb..a6a7470f3 100644 --- a/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt +++ b/src/Microsoft.VisualStudio.Threading/netstandard2.0/PublicAPI.Unshipped.txt @@ -0,0 +1,10 @@ +Microsoft.VisualStudio.Threading.AsyncMutex +Microsoft.VisualStudio.Threading.AsyncMutex.AsyncMutex(string? name) -> void +Microsoft.VisualStudio.Threading.AsyncMutex.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync() -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.EnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.Dispose() -> void +Microsoft.VisualStudio.Threading.AsyncMutex.LockReleaser.IsAbandoned.get -> bool +Microsoft.VisualStudio.Threading.AsyncMutex.Name.get -> string? +Microsoft.VisualStudio.Threading.AsyncMutex.TryEnterAsync(System.TimeSpan timeout) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/test/Microsoft.VisualStudio.Threading.Tests/AsyncMutexTests.cs b/test/Microsoft.VisualStudio.Threading.Tests/AsyncMutexTests.cs new file mode 100644 index 000000000..489d6c96f --- /dev/null +++ b/test/Microsoft.VisualStudio.Threading.Tests/AsyncMutexTests.cs @@ -0,0 +1,219 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.VisualStudio.Threading; +using Xunit; +using Xunit.Abstractions; + +public class AsyncMutexTests : TestBase, IDisposable +{ + private readonly AsyncMutex mutex = new($"test {Guid.NewGuid()}"); + + public AsyncMutexTests(ITestOutputHelper logger) + : base(logger) + { + } + + public void Dispose() + { + this.mutex.Dispose(); + } + + [Fact] + public void DisposeWithoutUse() + { + // This test intentionally left blank. The tested functionality is in the constructor and Dispose method. + } + + [Fact] + public void Dispose_Twice() + { + // The second disposal happens in the Dispose method of this class. + this.mutex.Dispose(); + } + + [Fact] + public async Task EnterAsync_Release_Uncontested() + { + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task EnterAsync_DoubleRelease() + { + AsyncMutex.LockReleaser releaser = await this.mutex.EnterAsync(); + releaser.Dispose(); + releaser.Dispose(); + } + + [Fact] + public async Task EnterAsync_Reentrancy() + { + using (AsyncMutex.LockReleaser releaser = await this.mutex.EnterAsync()) + { + await Assert.ThrowsAsync(async () => await this.mutex.EnterAsync()); + } + + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task TryEnterAsync_Reentrancy() + { + using (AsyncMutex.LockReleaser releaser = await this.mutex.EnterAsync()) + { + await Assert.ThrowsAsync(async () => await this.mutex.TryEnterAsync(Timeout.InfiniteTimeSpan)); + } + + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task EnterAsync_Contested() + { + // We don't allow attempted reentrancy, so create a new mutex object to use for the contested locks. + AsyncMutex mutex2 = new(this.mutex.Name); + + // Acquire and hold the mutex so that we can test timeout behavior. + using (AsyncMutex.LockReleaser releaser = await this.mutex.EnterAsync()) + { + // Verify that we can't acquire the mutex within a timeout. + await Assert.ThrowsAsync(async () => await mutex2.EnterAsync(TimeSpan.Zero)); + await Assert.ThrowsAsync(async () => await mutex2.EnterAsync(TimeSpan.FromMilliseconds(1))); + } + + // Verify that we can acquire the mutex after it is released. + using (AsyncMutex.LockReleaser releaser2 = await mutex2.EnterAsync()) + { + } + + // Verify that the main mutex still functions. + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task TryEnterAsync_Contested() + { + // We don't allow attempted reentrancy, so create a new mutex object to use for the contested locks. + AsyncMutex mutex2 = new(this.mutex.Name); + + // Acquire and hold the mutex so that we can test timeout behavior. + using (AsyncMutex.LockReleaser? releaser = await this.mutex.TryEnterAsync(Timeout.InfiniteTimeSpan)) + { + // Verify that we can't acquire the mutex within a timeout. + Assert.Null(await mutex2.TryEnterAsync(TimeSpan.Zero)); + Assert.Null(await mutex2.TryEnterAsync(TimeSpan.FromMilliseconds(1))); + + // Just verify that the syntax is nice. + using (AsyncMutex.LockReleaser? releaser2 = await mutex2.TryEnterAsync(TimeSpan.Zero)) + { + Assert.Null(releaser2); + } + } + + // Verify that we can acquire the mutex after it is released. + using (AsyncMutex.LockReleaser releaser2 = await mutex2.EnterAsync()) + { + } + + // Verify that the main mutex still functions. + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task EnterAsync_InvalidNegativeTimeout() + { + await Assert.ThrowsAsync(async () => await this.mutex.EnterAsync(TimeSpan.FromMilliseconds(-2))); + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task TryEnterAsync_InvalidNegativeTimeout() + { + await Assert.ThrowsAsync(async () => await this.mutex.TryEnterAsync(TimeSpan.FromMilliseconds(-2))); + await this.VerifyMutexEnterReleaseAsync(); + } + + [Fact] + public async Task EnterAsync_AbandonedMutex() + { + using AsyncMutex mutex2 = new(this.mutex.Name); + + AsyncMutex.LockReleaser abandonedReleaser = await this.mutex.EnterAsync(); + Assert.False(abandonedReleaser.IsAbandoned); + + // Dispose the mutex WITHOUT first releasing it. + this.mutex.Dispose(); + + using (AsyncMutex.LockReleaser releaser2 = await mutex2.EnterAsync()) + { + Assert.True(releaser2.IsAbandoned); + } + } + + [Fact] + public async Task TryEnterAsync_AbandonedMutex() + { + using AsyncMutex mutex2 = new(this.mutex.Name); + + AsyncMutex.LockReleaser? abandonedReleaser = await this.mutex.TryEnterAsync(Timeout.InfiniteTimeSpan); + Assert.False(abandonedReleaser.Value.IsAbandoned); + + // Dispose the mutex WITHOUT first releasing it. + this.mutex.Dispose(); + + using (AsyncMutex.LockReleaser? releaser2 = await mutex2.TryEnterAsync(Timeout.InfiniteTimeSpan)) + { + Assert.True(releaser2.Value.IsAbandoned); + } + } + + [Fact] + public async Task EnterAsync_ThrowsObjectDisposedException() + { + this.mutex.Dispose(); + await Assert.ThrowsAsync(async () => await this.mutex.EnterAsync()); + } + + [Fact] + public async Task TryEnterAsync_ThrowsObjectDisposedException() + { + this.mutex.Dispose(); + await Assert.ThrowsAsync(async () => await this.mutex.TryEnterAsync(Timeout.InfiniteTimeSpan)); + } + + [Fact] + public async Task TryEnterAsync_Twice() + { + using (AsyncMutex.LockReleaser? releaser = await this.mutex.TryEnterAsync(TimeSpan.Zero)) + { + } + + using (AsyncMutex.LockReleaser? releaser = await this.mutex.TryEnterAsync(TimeSpan.Zero)) + { + } + } + + /// + /// Asserts behavior or the .NET Mutex class that we may be emulating in our class. + /// + [Fact] + public void Mutex_BaselineBehaviors() + { + // Verify reentrant behavior. + using Mutex mutex = new(false, $"test {Guid.NewGuid()}"); + mutex.WaitOne(); + mutex.WaitOne(); + mutex.ReleaseMutex(); + mutex.ReleaseMutex(); + Assert.Throws(mutex.ReleaseMutex); + } + + private async Task VerifyMutexEnterReleaseAsync() + { + using AsyncMutex.LockReleaser releaser = await this.mutex.EnterAsync(); + } +}