Skip to content

Commit 2af276d

Browse files
committed
Fix incorrect reporting of task completion
Fixes Amanieu#37
1 parent 2016e43 commit 2af276d

File tree

3 files changed

+59
-21
lines changed

3 files changed

+59
-21
lines changed

src/scheduler.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@ void aligned_free(void* addr) LIBASYNC_NOEXCEPT
5757
static void generic_wait_handler(task_wait_handle wait_task)
5858
{
5959
// Create an event to wait on
60-
task_wait_event thread_event;
60+
task_wait_event event;
61+
event.init();
6162

6263
// Create a continuation for the task we are waiting for
63-
wait_task.on_finish([&thread_event] {
64+
wait_task.on_finish([&event] {
6465
// Just signal the thread event
65-
thread_event.signal(wait_type::task_finished);
66+
event.signal(wait_type::task_finished);
6667
});
6768

6869
// Wait for the event to be set
69-
thread_event.wait();
70+
event.wait();
7071
}
7172

7273
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)

src/task_wait_event.h

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,31 +32,61 @@ enum wait_type {
3232

3333
// OS-supported event object which can be used to wait for either a task to
3434
// finish or for the scheduler to have more work for the current thread.
35+
//
36+
// The event object is lazily initialized to avoid unnecessary API calls.
3537
class task_wait_event {
36-
std::mutex m;
37-
std::condition_variable c;
38+
std::aligned_storage<sizeof(std::mutex), std::alignment_of<std::mutex>::value>::type m;
39+
std::aligned_storage<sizeof(std::condition_variable), std::alignment_of<std::condition_variable>::value>::type c;
3840
int event_mask;
41+
bool initialized;
42+
43+
std::mutex& mutex()
44+
{
45+
return *reinterpret_cast<std::mutex*>(&m);
46+
}
47+
std::condition_variable& cond()
48+
{
49+
return *reinterpret_cast<std::condition_variable*>(&c);
50+
}
3951

4052
public:
4153
task_wait_event()
42-
: event_mask(0) {}
54+
: event_mask(0), initialized(false) {}
55+
56+
~task_wait_event()
57+
{
58+
if (initialized) {
59+
mutex().~mutex();
60+
cond().~condition_variable();
61+
}
62+
}
63+
64+
// Initialize the event, must be done before any other functions are called.
65+
void init()
66+
{
67+
if (!initialized) {
68+
new(&m) std::mutex;
69+
new(&c) std::condition_variable;
70+
initialized = true;
71+
}
72+
}
4373

4474
// Wait for an event to occur. Returns the event(s) that occurred. This also
4575
// clears any pending events afterwards.
4676
int wait()
4777
{
48-
std::unique_lock<std::mutex> lock(m);
78+
std::unique_lock<std::mutex> lock(mutex());
4979
while (event_mask == 0)
50-
c.wait(lock);
80+
cond().wait(lock);
5181
int result = event_mask;
5282
event_mask = 0;
53-
return result != 0;
83+
return result;
5484
}
5585

5686
// Check if a specific event is ready
5787
bool try_wait(int event)
5888
{
59-
std::lock_guard<std::mutex> lock(m);
89+
std::lock_guard<std::mutex> lock(mutex());
6090
int result = event_mask & event;
6191
event_mask &= ~event;
6292
return result != 0;
@@ -65,10 +95,10 @@ class task_wait_event {
6595
// Signal an event and wake up a sleeping thread
6696
void signal(int event)
6797
{
68-
std::unique_lock<std::mutex> lock(m);
98+
std::unique_lock<std::mutex> lock(mutex());
6999
event_mask |= event;
70100
lock.unlock();
71-
c.notify_one();
101+
cond().notify_one();
72102
}
73103
};
74104

src/threadpool_scheduler.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ namespace detail {
3737
struct LIBASYNC_CACHELINE_ALIGN thread_data_t {
3838
work_steal_queue queue;
3939
std::minstd_rand rng;
40-
task_wait_event event;
4140
std::thread handle;
4241
};
4342

@@ -180,11 +179,15 @@ static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_
180179
// Flag indicating if we have added a continuation to the task
181180
bool added_continuation = false;
182181

182+
// Event to wait on
183+
task_wait_event event;
184+
183185
// Loop while waiting for the task to complete
184186
while (true) {
185187
// Check if the task has finished. If we have added a continuation, we
186-
// need to make sure the event has been signaled.
187-
if (wait_task && (added_continuation ? current_thread.event.try_wait(wait_type::task_finished) : wait_task.ready()))
188+
// need to make sure the event has been signaled, otherwise the other
189+
// thread may try to signal it after we have freed it.
190+
if (wait_task && (added_continuation ? event.try_wait(wait_type::task_finished) : wait_task.ready()))
188191
return;
189192

190193
// Try to get a task from the local queue
@@ -220,11 +223,13 @@ static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_
220223
return;
221224
}
222225

226+
// Initialize the event object
227+
event.init();
228+
223229
// No tasks found, so sleep until something happens.
224230
// If a continuation has not been added yet, add it.
225231
if (wait_task && !added_continuation) {
226232
// Create a continuation for the task we are waiting for
227-
task_wait_event& event = current_thread.event;
228233
wait_task.on_finish([&event] {
229234
// Signal the thread's event
230235
event.signal(wait_type::task_finished);
@@ -234,27 +239,29 @@ static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_
234239

235240
// Add our thread to the list of waiting threads
236241
size_t num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
237-
impl->waiters[num_waiters_val] = &current_thread.event;
242+
impl->waiters[num_waiters_val] = &event;
238243
impl->num_waiters.store(num_waiters_val + 1, std::memory_order_relaxed);
239244

240245
// Wait for our event to be signaled when a task is scheduled or
241246
// the task we are waiting for has completed.
242247
locked.unlock();
243-
int events = current_thread.event.wait();
248+
int events = event.wait();
244249
locked.lock();
245250

246251
// Remove our thread from the list of waiting threads
247252
num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
248253
for (std::size_t i = 0; i < num_waiters_val; i++) {
249-
if (impl->waiters[i] == &current_thread.event) {
254+
if (impl->waiters[i] == &event) {
250255
if (i != num_waiters_val - 1)
251256
std::swap(impl->waiters[i], impl->waiters[num_waiters_val - 1]);
252257
impl->num_waiters.store(num_waiters_val - 1, std::memory_order_relaxed);
253258
break;
254259
}
255260
}
256261

257-
// Check again if the task has finished
262+
// Check again if the task has finished. We have added a
263+
// continuation at this point, so we need to check that the
264+
// continuation has finished signaling the event.
258265
if (wait_task && (events & wait_type::task_finished))
259266
return;
260267
}

0 commit comments

Comments
 (0)