Skip to content

Commit 55ac6de

Browse files
ajtownsrandom-zebra
authored andcommitted
scheduler: switch from boost to std
Changes from boost::chrono to std::chrono, boost::condition_var to std::condition_var, boost::mutex to sync.h Mutex, and reverselock.h to sync.h REVERSE_LOCK. Also adds threadsafety annotations to CScheduler members.
1 parent 15f292b commit 55ac6de

File tree

3 files changed

+46
-49
lines changed

3 files changed

+46
-49
lines changed

src/scheduler.cpp

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include "scheduler.h"
77

88
#include "random.h"
9-
#include "reverselock.h"
109

1110
#include <assert.h>
1211
#include <utility>
@@ -22,7 +21,7 @@ CScheduler::~CScheduler()
2221

2322
void CScheduler::serviceQueue()
2423
{
25-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
24+
WAIT_LOCK(newTaskMutex, lock);
2625
++nThreadsServicingQueue;
2726

2827
// newTaskMutex is locked throughout this loop EXCEPT
@@ -31,7 +30,7 @@ void CScheduler::serviceQueue()
3130
while (!shouldStop()) {
3231
try {
3332
if (!shouldStop() && taskQueue.empty()) {
34-
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
33+
REVERSE_LOCK(lock);
3534
}
3635
while (!shouldStop() && taskQueue.empty()) {
3736
// Wait until there is something to do.
@@ -41,12 +40,13 @@ void CScheduler::serviceQueue()
4140
// Wait until either there is a new task, or until
4241
// the time of the first item on the queue:
4342

44-
// Some boost versions have a conflicting overload of wait_until that returns void.
45-
// Explicitly use a template here to avoid hitting that overload.
46-
while (!shouldStop() && !taskQueue.empty() &&
47-
newTaskScheduled.wait_until<>(lock, taskQueue.begin()->first) != boost::cv_status::timeout) {
48-
// Keep waiting until timeout
43+
while (!shouldStop() && !taskQueue.empty()) {
44+
std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
45+
if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
46+
break; // Exit loop after timeout, it means we reached the time of the event
47+
}
4948
}
49+
5050
// If there are multiple threads, the queue can empty while we're waiting (another
5151
// thread may service the task we were waiting on).
5252
if (shouldStop() || taskQueue.empty())
@@ -58,7 +58,7 @@ void CScheduler::serviceQueue()
5858
{
5959
// Unlock before calling f, so it can reschedule itself or another task
6060
// without deadlocking:
61-
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
61+
REVERSE_LOCK(lock);
6262
f();
6363
}
6464
} catch (...) {
@@ -73,7 +73,7 @@ void CScheduler::serviceQueue()
7373
void CScheduler::stop(bool drain)
7474
{
7575
{
76-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
76+
LOCK(newTaskMutex);
7777
if (drain)
7878
stopWhenEmpty = true;
7979
else
@@ -82,18 +82,18 @@ void CScheduler::stop(bool drain)
8282
newTaskScheduled.notify_all();
8383
}
8484

85-
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
85+
void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
8686
{
8787
{
88-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
88+
LOCK(newTaskMutex);
8989
taskQueue.emplace(t, f);
9090
}
9191
newTaskScheduled.notify_one();
9292
}
9393

9494
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
9595
{
96-
schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
96+
schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds));
9797
}
9898

9999
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
@@ -107,10 +107,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds
107107
scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
108108
}
109109

110-
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
111-
boost::chrono::system_clock::time_point &last) const
110+
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
111+
std::chrono::system_clock::time_point &last) const
112112
{
113-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
113+
LOCK(newTaskMutex);
114114
size_t result = taskQueue.size();
115115
if (!taskQueue.empty()) {
116116
first = taskQueue.begin()->first;
@@ -120,7 +120,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
120120
}
121121

122122
bool CScheduler::AreThreadsServicingQueue() const {
123-
boost::unique_lock<boost::mutex> lock(newTaskMutex);
123+
LOCK(newTaskMutex);
124124
return nThreadsServicingQueue;
125125
}
126126

@@ -133,7 +133,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
133133
if (m_are_callbacks_running) return;
134134
if (m_callbacks_pending.empty()) return;
135135
}
136-
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
136+
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
137137
}
138138

139139
void SingleThreadedSchedulerClient::ProcessQueue() {

src/scheduler.h

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@
88

99
//
1010
// NOTE:
11-
// boost::thread / boost::chrono should be ported to std::thread / std::chrono
11+
// boost::thread should be ported to std::thread
1212
// when we support C++11.
1313
//
14-
#include <boost/chrono/chrono.hpp>
15-
#include <boost/thread.hpp>
14+
#include <condition_variable>
15+
#include <functional>
16+
#include <list>
1617
#include <map>
1718

1819
#include "sync.h"
@@ -28,8 +29,8 @@
2829
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
2930
// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
3031
//
31-
// ... then at program shutdown, clean up the thread running serviceQueue:
32-
// t->interrupt();
32+
// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
33+
// s->stop();
3334
// t->join();
3435
// delete t;
3536
// delete s; // Must be done after thread is interrupted/joined.
@@ -44,7 +45,7 @@ class CScheduler
4445
typedef std::function<void(void)> Function;
4546

4647
// Call func at/after time t
47-
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
48+
void schedule(Function f, std::chrono::system_clock::time_point t);
4849

4950
// Convenience method: call f once deltaMilliSeconds from now
5051
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
@@ -69,20 +70,20 @@ class CScheduler
6970

7071
// Returns number of tasks waiting to be serviced,
7172
// and first and last task times
72-
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
73-
boost::chrono::system_clock::time_point &last) const;
73+
size_t getQueueInfo(std::chrono::system_clock::time_point &first,
74+
std::chrono::system_clock::time_point &last) const;
7475

7576
// Returns true if there are threads actively running in serviceQueue()
7677
bool AreThreadsServicingQueue() const;
7778

7879
private:
79-
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
80-
boost::condition_variable newTaskScheduled;
81-
mutable boost::mutex newTaskMutex;
82-
int nThreadsServicingQueue;
83-
bool stopRequested;
84-
bool stopWhenEmpty;
85-
bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
80+
mutable Mutex newTaskMutex;
81+
std::condition_variable newTaskScheduled;
82+
std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
83+
int nThreadsServicingQueue GUARDED_BY(newTaskMutex);
84+
bool stopRequested GUARDED_BY(newTaskMutex);
85+
bool stopWhenEmpty GUARDED_BY(newTaskMutex);
86+
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
8687
};
8788

8889
/**

src/test/scheduler_tests.cpp

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include "random.h"
77
#include "scheduler.h"
8+
#include "utiltime.h"
89
#if defined(HAVE_CONFIG_H)
910
#include "config/pivx-config.h"
1011
#endif
@@ -16,24 +17,19 @@
1617

1718
BOOST_AUTO_TEST_SUITE(scheduler_tests)
1819

19-
static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime)
20+
static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime)
2021
{
2122
{
2223
boost::unique_lock<boost::mutex> lock(mutex);
2324
counter += delta;
2425
}
25-
boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min();
26+
std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min();
2627
if (rescheduleTime != noTime) {
2728
CScheduler::Function f = std::bind(&microTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
2829
s.schedule(f, rescheduleTime);
2930
}
3031
}
3132

32-
static void MicroSleep(uint64_t n)
33-
{
34-
boost::this_thread::sleep_for(boost::chrono::microseconds(n));
35-
}
36-
3733
BOOST_AUTO_TEST_CASE(manythreads)
3834
{
3935
// Stress test: hundreds of microsecond-scheduled tasks,
@@ -55,15 +51,15 @@ BOOST_AUTO_TEST_CASE(manythreads)
5551
boost::random::uniform_int_distribution<> randomMsec(-11, 1000);
5652
boost::random::uniform_int_distribution<> randomDelta(-1000, 1000);
5753

58-
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
59-
boost::chrono::system_clock::time_point now = start;
60-
boost::chrono::system_clock::time_point first, last;
54+
std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
55+
std::chrono::system_clock::time_point now = start;
56+
std::chrono::system_clock::time_point first, last;
6157
size_t nTasks = microTasks.getQueueInfo(first, last);
6258
BOOST_CHECK(nTasks == 0);
6359

6460
for (int i = 0; i < 100; ++i) {
65-
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
66-
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
61+
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
62+
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
6763
int whichCounter = zeroToNine(rng);
6864
CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
6965
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
@@ -80,15 +76,15 @@ BOOST_AUTO_TEST_CASE(manythreads)
8076
for (int i = 0; i < 5; i++)
8177
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks));
8278

83-
MicroSleep(600);
84-
now = boost::chrono::system_clock::now();
79+
UninterruptibleSleep(std::chrono::microseconds{600});
80+
now = std::chrono::system_clock::now();
8581

8682
// More threads and more tasks:
8783
for (int i = 0; i < 5; i++)
8884
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks));
8985
for (int i = 0; i < 100; i++) {
90-
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
91-
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
86+
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
87+
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
9288
int whichCounter = zeroToNine(rng);
9389
CScheduler::Function f = std::bind(&microTask, std::ref(microTasks),
9490
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),

0 commit comments

Comments
 (0)