-
-
Notifications
You must be signed in to change notification settings - Fork 54
/
Copy pathAsyncTaskQueue.hpp
190 lines (158 loc) · 5.29 KB
/
AsyncTaskQueue.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
//
// AsyncTaskQueue.hpp
// Clock Signal
//
// Created by Thomas Harte on 07/10/2016.
// Copyright 2016 Thomas Harte. All rights reserved.
//
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <thread>
#include <vector>
#include "ClockReceiver/TimeTypes.hpp"
namespace Concurrency {
/// An implementation detail; provides the time-centric part of a TaskQueue with a real Performer.
template <typename Performer> struct TaskQueueStorage {
template <typename... Args> TaskQueueStorage(Args&&... args) :
performer(std::forward<Args>(args)...),
last_fired_(Time::nanos_now()) {}
Performer performer;
protected:
void update() {
auto time_now = Time::nanos_now();
performer.perform(time_now - last_fired_);
last_fired_ = time_now;
}
private:
Time::Nanos last_fired_;
};
/// An implementation detail; provides a no-op implementation of time advances for TaskQueues without a Performer.
template <> struct TaskQueueStorage<void> {
TaskQueueStorage() {}
protected:
void update() {}
};
/*!
A task queue allows a caller to enqueue @c void(void) functions. Those functions are guaranteed
to be performed serially and asynchronously from the caller.
If @c perform_automatically is true, functions will be performed as soon as is possible,
at the cost of thread synchronisation.
If @c perform_automatically is false, functions will be queued up but not dispatched
until a call to perform().
If a @c Performer type is supplied then a public member, @c performer will be constructed
with the arguments supplied to TaskQueue's constructor. That instance will receive calls of the
form @c .perform(nanos) before every batch of new actions, indicating how much time has
passed since the previous @c perform.
@note Even if @c perform_automatically is true, actions may be batched, when a long-running
action occupies the asynchronous thread for long enough. So it is not true that @c perform will be
called once per action.
*/
template <bool perform_automatically, bool start_immediately = true, typename Performer = void> class AsyncTaskQueue: public TaskQueueStorage<Performer> {
public:
template <typename... Args> AsyncTaskQueue(Args&&... args) :
TaskQueueStorage<Performer>(std::forward<Args>(args)...) {
if constexpr (start_immediately) {
start();
}
}
/// Enqueus @c post_action to be performed asynchronously at some point
/// in the future. If @c perform_automatically is @c true then the action
/// will be performed as soon as possible. Otherwise it will sit unsheculed until
/// a call to @c perform().
///
/// Actions may be elided.
///
/// If this TaskQueue has a @c Performer then the action will be performed
/// on the same thread as the performer, after the performer has been updated
/// to 'now'.
void enqueue(const std::function<void(void)> &post_action) {
std::lock_guard guard(condition_mutex_);
actions_.push_back(post_action);
if constexpr (perform_automatically) {
condition_.notify_all();
}
}
/// Causes any enqueued actions that are not yet scheduled to be scheduled.
void perform() {
if(actions_.empty()) {
return;
}
condition_.notify_all();
}
/// Permanently stops this task queue, blocking until that has happened.
/// All pending actions will be performed first.
///
/// The queue cannot be restarted; this is a destructive action.
void stop() {
if(thread_.joinable()) {
should_quit_ = true;
enqueue([] {});
if constexpr (!perform_automatically) {
perform();
}
thread_.join();
}
}
/// Starts the queue if it has never been started before.
///
/// This is not guaranteed safely to restart a stopped queue.
void start() {
thread_ = std::thread{
[this] {
ActionVector actions;
// Continue until told to quit.
while(!should_quit_) {
// Wait for new actions to be signalled, and grab them.
std::unique_lock lock(condition_mutex_);
while(actions_.empty() && !should_quit_) {
condition_.wait(lock);
}
std::swap(actions, actions_);
lock.unlock();
// Update to now (which is possibly a no-op).
TaskQueueStorage<Performer>::update();
// Perform the actions and destroy them.
for(const auto &action: actions) {
action();
}
actions.clear();
}
}
};
}
/// Schedules any remaining unscheduled work, then blocks synchronously
/// until all scheduled work has been performed.
void flush() {
std::mutex flush_mutex;
std::condition_variable flush_condition;
bool has_run = false;
std::unique_lock lock(flush_mutex);
enqueue([&flush_mutex, &flush_condition, &has_run] () {
std::unique_lock inner_lock(flush_mutex);
has_run = true;
flush_condition.notify_all();
});
if constexpr (!perform_automatically) {
perform();
}
flush_condition.wait(lock, [&has_run] { return has_run; });
}
~AsyncTaskQueue() {
stop();
}
private:
// The list of actions waiting be performed. These will be elided,
// increasing their latency, if the emulation thread falls behind.
using ActionVector = std::vector<std::function<void(void)>>;
ActionVector actions_;
// Necessary synchronisation parts.
std::atomic<bool> should_quit_ = false;
std::mutex condition_mutex_;
std::condition_variable condition_;
// Ensure the thread isn't constructed until after the mutex
// and condition variable.
std::thread thread_;
};
}