Skip to content

Commit 0c345a0

Browse files
committed
[FOLD] Have Workers use join(), also rename member variables
1 parent 0cfcf8f commit 0c345a0

File tree

5 files changed

+32
-65
lines changed

5 files changed

+32
-65
lines changed

Builds/CMake/RippledCore.cmake

-1
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,6 @@ if (tests)
796796
src/test/core/CryptoPRNG_test.cpp
797797
src/test/core/JobQueue_test.cpp
798798
src/test/core/SociDB_test.cpp
799-
src/test/core/Workers_test.cpp
800799
#[===============================[
801800
test sources:
802801
subdir: csf

Builds/levelization/results/ordering.txt

-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ test.core > ripple.basics
121121
test.core > ripple.beast
122122
test.core > ripple.core
123123
test.core > ripple.crypto
124-
test.core > ripple.json
125124
test.core > ripple.server
126125
test.core > test.jtx
127126
test.core > test.toplevel

src/ripple/core/impl/Workers.cpp

+22-21
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@
2626
namespace ripple {
2727

2828
Workers::Workers(Callback& callback, std::string name, unsigned int count)
29-
: m_callback(callback)
29+
: callback_(callback)
3030
{
3131
assert(count != 0);
3232

33+
threads_.reserve(count);
3334
while (count--)
3435
{
35-
std::thread t(
36+
threads_.emplace_back(
3637
[this, name](unsigned int instance) {
3738
auto const n = name + ":" + std::to_string(instance);
3839

39-
threads_++;
40+
++threadCount_;
4041

4142
while (!stopping_)
4243
{
@@ -48,19 +49,19 @@ Workers::Workers(Callback& callback, std::string name, unsigned int count)
4849
beast::setCurrentThreadName(n + " [zZz]");
4950

5051
// If there's nothing to do, go to sleep
51-
while (h == t && !stopping_.load(std::memory_order_relaxed))
52+
while (h == t && !stopping_.load())
5253
{
53-
paused_++;
54+
++paused_;
5455
{
55-
std::unique_lock lock(m_mut);
56-
m_cv.wait_for(lock, std::chrono::seconds(5));
56+
std::unique_lock lock(mut_);
57+
cv_.wait_for(lock, std::chrono::seconds(5));
5758
}
5859
t = tail_.load();
5960
h = head_.load();
60-
paused_--;
61+
--paused_;
6162
}
6263

63-
if (!stopping_.load(std::memory_order_relaxed) && h > t)
64+
if (!stopping_.load() && h > t)
6465
{
6566
// As long as we aren't stopping and there is a task
6667
// that's waiting, we try to do work.
@@ -74,44 +75,44 @@ Workers::Workers(Callback& callback, std::string name, unsigned int count)
7475
// just in case, if one does catch it.
7576
try
7677
{
77-
m_callback.processTask(instance);
78+
callback_.processTask(instance);
7879
}
7980
catch (...)
8081
{
81-
m_callback.uncaughtException(
82+
callback_.uncaughtException(
8283
instance, std::current_exception());
8384
}
8485
}
8586
}
8687
}
8788

8889
// Track number of threads
89-
threads_--;
90+
--threadCount_;
9091
},
9192
count);
92-
t.detach();
9393
};
9494
}
9595

9696
Workers::~Workers()
9797
{
98-
if (threads_.load() != 0)
98+
if (threadCount_.load() != 0)
9999
stop();
100100

101-
assert(stopping_.load() && threads_.load() == 0);
101+
assert(stopping_.load() && threadCount_.load() == 0);
102102
}
103103

104104
void
105105
Workers::stop()
106106
{
107-
if (threads_.load() != 0)
107+
if (threadCount_.load() != 0)
108108
{
109-
if (!stopping_.exchange(true) || (threads_.load() != 0))
110-
m_cv.notify_all();
109+
if (!stopping_.exchange(true) || (threadCount_.load() != 0))
110+
cv_.notify_all();
111111
}
112112

113-
while (threads_.load() != 0)
114-
std::this_thread::sleep_for(std::chrono::seconds(1));
113+
for (auto& thread : threads_)
114+
if (thread.joinable())
115+
thread.join();
115116
}
116117

117118
void
@@ -123,7 +124,7 @@ Workers::addTask()
123124
head_++;
124125

125126
if (paused_ != 0)
126-
m_cv.notify_one();
127+
cv_.notify_one();
127128
}
128129

129130
} // namespace ripple

src/ripple/core/impl/Workers.h

+10-8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <exception>
2626
#include <mutex>
2727
#include <string>
28+
#include <vector>
2829

2930
namespace ripple {
3031

@@ -34,7 +35,7 @@ namespace ripple {
3435
the number of outstanding tasks, and dispatches work. When the pool detects
3536
that there is no work to be done, it puts threads to sleep.
3637
37-
The pool does not decide make decisions about which task to run; that is
38+
The pool does not make decisions about which task to run; that is
3839
handled by a callback that the pool invokes. This makes it possible to
3940
implement the work dispatch strategy (e.g. FIFO or priority queues) that
4041
makes sense without requiring changes in the thread pool itself.
@@ -93,8 +94,7 @@ class Workers
9394

9495
/** Create a new thread pool.
9596
96-
The number of initial threads must be specified. The
97-
default is to create one thread per CPU.
97+
The number of initial threads must be specified.
9898
9999
@param callback The task selection & execution algorithm.
100100
@param name The name for this pool (used to name threads)
@@ -108,7 +108,7 @@ class Workers
108108
unsigned int
109109
count() const noexcept
110110
{
111-
return threads_.load();
111+
return threadCount_.load();
112112
}
113113

114114
/** Pause all threads and wait until they are paused.
@@ -134,19 +134,21 @@ class Workers
134134
addTask();
135135

136136
private:
137-
Callback& m_callback;
137+
Callback& callback_;
138+
139+
std::vector<std::thread> threads_;
138140

139141
/// Used to signal sleeping threads to wake up.
140-
std::condition_variable m_cv;
142+
std::condition_variable cv_;
141143

142144
/// Used only to wait on the condition variable.
143-
std::mutex m_mut;
145+
std::mutex mut_;
144146

145147
/// True if the worker threads should stop.
146148
std::atomic<bool> stopping_ = false;
147149

148150
/// This represents the total number of threads in the thread pool:
149-
std::atomic<unsigned int> threads_ = 0;
151+
std::atomic<unsigned int> threadCount_ = 0;
150152

151153
/// The number of threads that are paused at this time:
152154
std::atomic<unsigned int> paused_ = 0;

src/test/core/Workers_test.cpp

-34
This file was deleted.

0 commit comments

Comments
 (0)