|
29 | 29 | // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. |
30 | 30 | //--------------------------------------------------------------------------- |
31 | 31 |
|
32 | | -using System; |
33 | | -using System.Diagnostics.CodeAnalysis; |
34 | 32 | using System.Threading; |
35 | 33 | using System.Threading.Tasks; |
36 | | -using System.Threading.Tasks.Sources; |
37 | 34 |
|
38 | 35 | namespace RabbitMQ.Client.Impl |
39 | 36 | { |
40 | | - sealed class AsyncManualResetEvent : IValueTaskSource |
| 37 | + sealed class AsyncManualResetEvent |
41 | 38 | { |
42 | | - private ManualResetValueTaskSourceCore<bool> _valueTaskSource; |
43 | | - private bool _isSet; |
| 39 | + volatile TaskCompletionSource<bool> _taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously); |
44 | 40 |
|
45 | 41 | public AsyncManualResetEvent(bool initialState = false) |
46 | 42 | { |
47 | | - _isSet = initialState; |
48 | | - _valueTaskSource.Reset(); |
49 | 43 | if (initialState) |
50 | 44 | { |
51 | | - _valueTaskSource.SetResult(true); |
| 45 | + _taskCompletionSource.SetResult(true); |
52 | 46 | } |
53 | 47 | } |
54 | 48 |
|
55 | | - public bool IsSet => Volatile.Read(ref _isSet); |
| 49 | + public bool IsSet => _taskCompletionSource.Task.IsCompleted; |
56 | 50 |
|
57 | | - public async ValueTask WaitAsync(CancellationToken cancellationToken) |
| 51 | + public Task WaitAsync(CancellationToken cancellationToken = default) |
58 | 52 | { |
59 | | - if (IsSet) |
60 | | - { |
61 | | - return; |
62 | | - } |
63 | | - |
64 | | - cancellationToken.ThrowIfCancellationRequested(); |
65 | | - |
66 | | - CancellationTokenRegistration tokenRegistration = |
67 | | -#if NET |
68 | | - cancellationToken.UnsafeRegister( |
69 | | - static state => |
70 | | - { |
71 | | - var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!; |
72 | | - source.SetException(new OperationCanceledException(token)); |
73 | | - }, (_valueTaskSource, cancellationToken)); |
74 | | -#else |
75 | | - cancellationToken.Register( |
76 | | - static state => |
77 | | - { |
78 | | - var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!; |
79 | | - source.SetException(new OperationCanceledException(token)); |
80 | | - }, |
81 | | - state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false); |
82 | | -#endif |
83 | | - try |
84 | | - { |
85 | | - await new ValueTask(this, _valueTaskSource.Version) |
86 | | - .ConfigureAwait(false); |
87 | | - } |
88 | | - finally |
89 | | - { |
90 | | -#if NET |
91 | | - await tokenRegistration.DisposeAsync() |
92 | | - .ConfigureAwait(false); |
93 | | -#else |
94 | | - tokenRegistration.Dispose(); |
95 | | -#endif |
96 | | - } |
| 53 | + Task<bool> task = _taskCompletionSource.Task; |
| 54 | + return task.IsCompleted ? task : task.WaitAsync(cancellationToken); |
97 | 55 | } |
98 | 56 |
|
99 | | - public void Set() |
100 | | - { |
101 | | - if (IsSet) |
102 | | - { |
103 | | - return; |
104 | | - } |
105 | | - |
106 | | - Volatile.Write(ref _isSet, true); |
107 | | - _valueTaskSource.SetResult(true); |
108 | | - } |
| 57 | + public void Set() => _taskCompletionSource.TrySetResult(true); |
109 | 58 |
|
110 | 59 | public void Reset() |
111 | 60 | { |
112 | | - if (!IsSet) |
| 61 | + while (true) |
113 | 62 | { |
114 | | - return; |
| 63 | + TaskCompletionSource<bool> currentTcs = _taskCompletionSource; |
| 64 | + if (!currentTcs.Task.IsCompleted || |
| 65 | + Interlocked.CompareExchange(ref _taskCompletionSource, new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), currentTcs) == currentTcs) |
| 66 | + { |
| 67 | + return; |
| 68 | + } |
115 | 69 | } |
116 | | - |
117 | | - Volatile.Write(ref _isSet, false); |
118 | | - _valueTaskSource.Reset(); |
119 | 70 | } |
120 | | - |
121 | | - void IValueTaskSource.GetResult(short token) |
122 | | - { |
123 | | - if (token != _valueTaskSource.Version) |
124 | | - { |
125 | | - ThrowIncorrectTokenException(); |
126 | | - } |
127 | | - |
128 | | - _valueTaskSource.GetResult(token); |
129 | | - } |
130 | | - |
131 | | - ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) |
132 | | - { |
133 | | - if (token != _valueTaskSource.Version) |
134 | | - { |
135 | | - ThrowIncorrectTokenException(); |
136 | | - } |
137 | | - |
138 | | - return _valueTaskSource.GetStatus(token); |
139 | | - } |
140 | | - |
141 | | - void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) |
142 | | - { |
143 | | - if (token != _valueTaskSource.Version) |
144 | | - { |
145 | | - ThrowIncorrectTokenException(); |
146 | | - } |
147 | | - |
148 | | - _valueTaskSource.OnCompleted(continuation, state, token, flags); |
149 | | - } |
150 | | - |
151 | | - [DoesNotReturn] |
152 | | - static void ThrowIncorrectTokenException() => |
153 | | - throw new InvalidOperationException("ValueTask cannot be awaited multiple times."); |
154 | 71 | } |
155 | 72 | } |
0 commit comments