-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathsemaphore
313 lines (266 loc) · 12.8 KB
/
semaphore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// semaphore standard header
// Copyright (c) Microsoft Corporation.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#pragma once
#ifndef _SEMAPHORE_
#define _SEMAPHORE_
#include <yvals.h>
#if _STL_COMPILER_PREPROCESSOR
#ifdef _M_CEE_PURE
#error <semaphore> is not supported when compiling with /clr:pure.
#endif // _M_CEE_PURE
#if !_HAS_CXX20
#pragma message("The contents of <semaphore> are available only with C++20 or later.")
#else // ^^^ !_HAS_CXX20 / _HAS_CXX20 vvv
#include <__msvc_chrono.hpp>
#include <atomic>
#include <climits>
#pragma pack(push, _CRT_PACKING)
#pragma warning(push, _STL_WARNING_LEVEL)
#pragma warning(disable : _STL_DISABLED_WARNINGS)
_STL_DISABLE_CLANG_WARNINGS
#pragma push_macro("new")
#undef new
_STD_BEGIN
template <class _Rep, class _Period>
_NODISCARD unsigned long long _Semaphore_deadline(const chrono::duration<_Rep, _Period>& _Rel_time) {
return __std_atomic_wait_get_deadline(
chrono::duration_cast<chrono::duration<unsigned long long, milli>>(_Rel_time).count());
}
template <class _Clock, class _Duration>
_NODISCARD unsigned long _Semaphore_remaining_timeout(const chrono::time_point<_Clock, _Duration>& _Abs_time) {
const auto _Now = _Clock::now();
if (_Now >= _Abs_time) {
return 0;
}
const auto _Rel_time = chrono::ceil<chrono::milliseconds>(_Abs_time - _Now);
static constexpr chrono::milliseconds _Ten_days{chrono::hours{24 * 10}};
static_assert(_Ten_days.count() < ULONG_MAX, "Bad sizing assumption");
if (_Rel_time >= _Ten_days) {
return static_cast<unsigned long>(_Ten_days.count());
}
return static_cast<unsigned long>(_Rel_time.count());
}
inline constexpr ptrdiff_t _Semaphore_max = (1ULL << (sizeof(ptrdiff_t) * CHAR_BIT - 1)) - 1;
template <ptrdiff_t _Least_max_value = _Semaphore_max>
class counting_semaphore {
public:
_NODISCARD static constexpr ptrdiff_t(max)() noexcept {
return _Least_max_value;
}
constexpr explicit counting_semaphore(const ptrdiff_t _Desired) noexcept /* strengthened */
: _Counter(_Desired) {
_STL_VERIFY(_Desired >= 0 && _Desired <= _Least_max_value,
"Precondition: desired >= 0, and desired <= max() (N4861 [thread.sema.cnt]/5)");
}
counting_semaphore(const counting_semaphore&) = delete;
counting_semaphore& operator=(const counting_semaphore&) = delete;
void release(ptrdiff_t _Update = 1) noexcept /* strengthened */ {
if (_Update == 0) {
return;
}
_STL_VERIFY(_Update > 0 && _Update <= _Least_max_value,
"Precondition: update >= 0, and update <= max() - counter (N4861 [thread.sema.cnt]/8)");
// We need to notify (wake) at least _Update waiting threads.
// Errors towards waking more cannot be always avoided, but they are performance issues.
// Errors towards waking fewer must be avoided, as they are correctness issues.
// release thread: Increment semaphore counter, then load waiting counter;
// acquire thread: Increment waiting counter, then load semaphore counter;
// memory_order_seq_cst for all four operations guarantees that the release thread loads
// the incremented value, or the acquire thread loads the incremented value, or both, but not neither.
// memory_order_seq_cst might be superfluous for some hardware mappings of the C++ memory model,
// but from the point of view of the C++ memory model itself it is needed; weaker orders don't work.
const ptrdiff_t _Prev = _Counter.fetch_add(static_cast<ptrdiff_t>(_Update));
_STL_VERIFY(_Prev + _Update > 0 && _Prev + _Update <= _Least_max_value,
"Precondition: update <= max() - counter (N4861 [thread.sema.cnt]/8)");
const ptrdiff_t _Waiting_upper_bound = _Waiting.load();
if (_Waiting_upper_bound == 0) {
// Definitely no one is waiting
} else if (_Waiting_upper_bound <= _Update) {
// No more waiting threads than update, can wake everyone.
_Counter.notify_all();
} else {
// Wake at most _Update. Though repeated notify_one() is somewhat less efficient than single notify_all(),
// the amount of OS calls is still the same; the benefit from trying not to wake unnecessary threads
// is expected to be greater than the loss on extra calls and atomic operations.
for (; _Update != 0; --_Update) {
_Counter.notify_one();
}
}
}
void _Wait(const unsigned long _Remaining_timeout) noexcept {
// See the comment in release()
_Waiting.fetch_add(1);
ptrdiff_t _Current = _Counter.load();
if (_Current == 0) {
__std_atomic_wait_direct(&_Counter, &_Current, sizeof(_Current), _Remaining_timeout);
}
_Waiting.fetch_sub(1, memory_order_relaxed);
}
void acquire() noexcept /* strengthened */ {
ptrdiff_t _Current = _Counter.load(memory_order_relaxed);
for (;;) {
while (_Current == 0) {
_Wait(_Atomic_wait_no_timeout);
_Current = _Counter.load(memory_order_relaxed);
}
_STL_VERIFY(_Current > 0 && _Current <= _Least_max_value,
"Invariant: counter >= 0, and counter <= max() "
"possibly caused by preconditions violation (N4861 [thread.sema.cnt]/8)");
// "happens after release" ordering is provided by this CAS, so loads and waits can be relaxed
if (_Counter.compare_exchange_weak(_Current, _Current - 1)) {
return;
}
}
}
_NODISCARD bool try_acquire() noexcept {
ptrdiff_t _Current = _Counter.load();
if (_Current == 0) {
return false;
}
_STL_VERIFY(_Current > 0 && _Current <= _Least_max_value,
"Invariant: counter >= 0, and counter <= max() "
"possibly caused by preconditions violation (N4861 [thread.sema.cnt]/8)");
return _Counter.compare_exchange_weak(_Current, _Current - 1);
}
template <class _Rep, class _Period>
_NODISCARD bool try_acquire_for(const chrono::duration<_Rep, _Period>& _Rel_time) {
auto _Deadline = _Semaphore_deadline(_Rel_time);
ptrdiff_t _Current = _Counter.load(memory_order_relaxed);
for (;;) {
while (_Current == 0) {
const auto _Remaining_timeout = __std_atomic_wait_get_remaining_timeout(_Deadline);
if (_Remaining_timeout == 0) {
return false;
}
_Wait(_Remaining_timeout);
_Current = _Counter.load(memory_order_relaxed);
}
_STL_VERIFY(_Current > 0 && _Current <= _Least_max_value,
"Invariant: counter >= 0, and counter <= max() "
"possibly caused by preconditions violation (N4861 [thread.sema.cnt]/8)");
// "happens after release" ordering is provided by this CAS, so loads and waits can be relaxed
if (_Counter.compare_exchange_weak(_Current, _Current - 1)) {
return true;
}
}
}
template <class _Clock, class _Duration>
_NODISCARD bool try_acquire_until(const chrono::time_point<_Clock, _Duration>& _Abs_time) {
static_assert(chrono::is_clock_v<_Clock>, "Clock type required");
ptrdiff_t _Current = _Counter.load(memory_order_relaxed);
for (;;) {
while (_Current == 0) {
const unsigned long _Remaining_timeout = _Semaphore_remaining_timeout(_Abs_time);
if (_Remaining_timeout == 0) {
return false;
}
_Wait(_Remaining_timeout);
_Current = _Counter.load(memory_order_relaxed);
}
_STL_VERIFY(_Current > 0 && _Current <= _Least_max_value,
"Invariant: counter >= 0, and counter <= max() "
"possibly caused by preconditions violation (N4861 [thread.sema.cnt]/8)");
// "happens after release" ordering is provided by this CAS, so loads and waits can be relaxed
if (_Counter.compare_exchange_weak(_Current, _Current - 1)) {
return true;
}
}
}
private:
atomic<ptrdiff_t> _Counter;
atomic<ptrdiff_t> _Waiting;
};
template <>
class counting_semaphore<1> {
public:
_NODISCARD static constexpr ptrdiff_t(max)() noexcept {
return 1;
}
constexpr explicit counting_semaphore(const ptrdiff_t _Desired) noexcept /* strengthened */
: _Counter(static_cast<unsigned char>(_Desired)) {
_STL_VERIFY((_Desired & ~1) == 0, "Precondition: desired >= 0, and desired <= max() "
"(N4861 [thread.sema.cnt]/5)");
}
counting_semaphore(const counting_semaphore&) = delete;
counting_semaphore& operator=(const counting_semaphore&) = delete;
void release(const ptrdiff_t _Update = 1) noexcept /* strengthened */ {
if (_Update == 0) {
return;
}
_STL_VERIFY(_Update == 1, "Precondition: update >= 0, "
"and update <= max() - counter (N4861 [thread.sema.cnt]/8)");
// TRANSITION, GH-1133: should be memory_order_release
_Counter.store(1);
_Counter.notify_one();
}
void acquire() noexcept /* strengthened */ {
for (;;) {
// "happens after release" ordering is provided by this exchange, so loads and waits can be relaxed
// TRANSITION, GH-1133: should be memory_order_acquire
unsigned char _Prev = _Counter.exchange(0);
if (_Prev == 1) {
break;
}
_STL_VERIFY(_Prev == 0, "Invariant: semaphore counter is non-negative and doesn't exceed max(), "
"possibly caused by memory corruption");
_Counter.wait(0, memory_order_relaxed);
}
}
_NODISCARD bool try_acquire() noexcept {
// TRANSITION, GH-1133: should be memory_order_acquire
unsigned char _Prev = _Counter.exchange(0);
_STL_VERIFY((_Prev & ~1) == 0, "Invariant: semaphore counter is non-negative and doesn't exceed max(), "
"possibly caused by memory corruption");
return reinterpret_cast<const bool&>(_Prev);
}
template <class _Rep, class _Period>
_NODISCARD bool try_acquire_for(const chrono::duration<_Rep, _Period>& _Rel_time) {
auto _Deadline = _Semaphore_deadline(_Rel_time);
for (;;) {
// "happens after release" ordering is provided by this exchange, so loads and waits can be relaxed
// TRANSITION, GH-1133: should be memory_order_acquire
unsigned char _Prev = _Counter.exchange(0);
if (_Prev == 1) {
return true;
}
_STL_VERIFY(_Prev == 0, "Invariant: semaphore counter is non-negative and doesn't exceed max(), "
"possibly caused by memory corruption");
const auto _Remaining_timeout = __std_atomic_wait_get_remaining_timeout(_Deadline);
if (_Remaining_timeout == 0) {
return false;
}
__std_atomic_wait_direct(&_Counter, &_Prev, sizeof(_Prev), _Remaining_timeout);
}
}
template <class _Clock, class _Duration>
_NODISCARD bool try_acquire_until(const chrono::time_point<_Clock, _Duration>& _Abs_time) {
static_assert(chrono::is_clock_v<_Clock>, "Clock type required");
for (;;) {
// "happens after release" ordering is provided by this exchange, so loads and waits can be relaxed
// TRANSITION, GH-1133: should be memory_order_acquire
unsigned char _Prev = _Counter.exchange(0);
if (_Prev == 1) {
return true;
}
_STL_VERIFY(_Prev == 0, "Invariant: semaphore counter is non-negative and doesn't exceed max(), "
"possibly caused by memory corruption");
const unsigned long _Remaining_timeout = _Semaphore_remaining_timeout(_Abs_time);
if (_Remaining_timeout == 0) {
return false;
}
__std_atomic_wait_direct(&_Counter, &_Prev, sizeof(_Prev), _Remaining_timeout);
}
}
private:
atomic<unsigned char> _Counter;
};
using binary_semaphore = counting_semaphore<1>;
_STD_END
#pragma pop_macro("new")
_STL_RESTORE_CLANG_WARNINGS
#pragma warning(pop)
#pragma pack(pop)
#endif // ^^^ _HAS_CXX20 ^^^
#endif // _STL_COMPILER_PREPROCESSOR
#endif // _SEMAPHORE_