Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 0.1.2 #46

Merged
merged 39 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9ce3c83
Use Modern CMake (#13)
friendlyanon Oct 2, 2020
ee97eb8
Fix tests (#16)
friendlyanon Oct 13, 2020
b67c214
fix include path in the sandbox and tests (#17)
David-Haim Oct 16, 2020
b496ae2
Simplify CI process (#18)
friendlyanon Oct 22, 2020
4e08bff
Simplify test lists (#19)
friendlyanon Oct 22, 2020
5c04acd
clang-format fine tuning (#20)
David-Haim Oct 23, 2020
4ee5321
Readme refactor (#21)
David-Haim Oct 30, 2020
ee5a635
threadpool test fine tune
David-Haim Oct 31, 2020
bb3edaf
revert test cmake to b496ae2f76977ee9a76cbe37230b63ec4064df2a (#28)
David-Haim Oct 31, 2020
d9bb70e
CI/CD - no xcode version is actively selected (#31)
David-Haim Nov 2, 2020
1131140
version bump + clang format
David-Haim Nov 2, 2020
3c087be
result_core refactor (#33)
David-Haim Nov 13, 2020
dd61606
Task object (#34)
David-Haim Nov 20, 2020
0057297
* move to standard coroutines on MSVC
David-Haim Dec 19, 2020
db46fc7
improvements and small fixes to 00572972740903cd00f6ffed28cca4dc7cfe42d7
David-Haim Dec 27, 2020
e7dc976
bump version to 0.0.9
David-Haim Dec 30, 2020
df560cc
Readme improvements, introduction of the task object
David-Haim Dec 31, 2020
8aaa0ce
Merge branch 'master' into develop
David-Haim Jan 1, 2021
e23e9b3
Merge remote-tracking branch 'origin/master' into develop
David-Haim Jan 14, 2021
9954189
Shared results (#37)
David-Haim Jan 16, 2021
ad4dbee
* shared_result refactor
David-Haim Feb 18, 2021
68c89f6
bump version
David-Haim Feb 20, 2021
5afa334
shared_result + executor API were added to the README
David-Haim Feb 24, 2021
679173a
Merge remote-tracking branch 'origin/master' into develop
David-Haim Feb 24, 2021
f65caad
Use explicit bool operator. (#38)
NN--- Mar 2, 2021
9661db5
* full move to ctest
David-Haim Mar 12, 2021
e9743fc
Merge branch 'develop' of https://github.com/David-Haim/concurrencpp …
David-Haim Mar 12, 2021
f5634fd
version bump
David-Haim Mar 17, 2021
5f13455
CI/CD: install clang 11 on ubuntu manually
David-Haim Mar 23, 2021
3c10fb1
Run all tests serially
David-Haim Mar 23, 2021
542fc80
Updated Readme
David-Haim Mar 26, 2021
0f6c6e8
* await_via and resolve_via are both deprecated
David-Haim Apr 18, 2021
a9112c2
shared_result refactor
David-Haim Jul 2, 2021
3672f9d
when_all refactor
David-Haim Jul 3, 2021
e70d241
when_any refactor
David-Haim Jul 7, 2021
001a1f9
* when_any_promise renamed to when_any_context
David-Haim Jul 22, 2021
6d2ad94
fix: usage of uninitialized callback in constexpr function (#43)
Ladence Jul 24, 2021
e7892f5
* readme update
David-Haim Aug 27, 2021
f4048a5
merge origin/master
David-Haim Aug 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improvements and small fixes to 0057297
  • Loading branch information
David-Haim committed Dec 27, 2020
commit db46fc7a8f7470b639ae0fe3b91b4ff6b5b198cd
7 changes: 4 additions & 3 deletions include/concurrencpp/executors/thread_pool_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace concurrencpp::details {
std::binary_semaphore m_semaphore;
bool m_idle;
bool m_abort;
std::atomic_bool m_event_found;
thread m_thread;

void balance_work();
Expand All @@ -72,16 +73,16 @@ namespace concurrencpp::details {

void enqueue_foreign(concurrencpp::task& task);
void enqueue_foreign(std::span<concurrencpp::task> tasks);
void enqueue_foreign(const std::deque<concurrencpp::task>& deque,
std::deque<concurrencpp::task>::iterator begin,
std::deque<concurrencpp::task>::iterator end);
void enqueue_foreign(std::deque<concurrencpp::task>::iterator begin, std::deque<concurrencpp::task>::iterator end);

void enqueue_local(concurrencpp::task& task);
void enqueue_local(std::span<concurrencpp::task> tasks);

void shutdown() noexcept;

std::chrono::milliseconds max_worker_idle_time() const noexcept;

bool appears_empty() const noexcept;
};
} // namespace concurrencpp::details

Expand Down
6 changes: 3 additions & 3 deletions include/concurrencpp/results/impl/producer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace concurrencpp::details {
m_result.emplace(std::forward<argument_types>(arguments)...);
}

void build_exception(std::exception_ptr exception) noexcept {
void build_exception(const std::exception_ptr& exception) noexcept {
assert(!m_result.has_value());
assert(!static_cast<bool>(m_exception));
m_exception = exception;
Expand Down Expand Up @@ -77,7 +77,7 @@ namespace concurrencpp::details {
m_ready = true;
}

void build_exception(std::exception_ptr exception) noexcept {
void build_exception(const std::exception_ptr& exception) noexcept {
assert(!m_ready);
assert(!static_cast<bool>(m_exception));
m_exception = exception;
Expand Down Expand Up @@ -127,7 +127,7 @@ namespace concurrencpp::details {
m_pointer = std::addressof(reference);
}

void build_exception(std::exception_ptr exception) noexcept {
void build_exception(const std::exception_ptr& exception) noexcept {
assert(m_pointer == nullptr);
assert(!static_cast<bool>(m_exception));
m_exception = exception;
Expand Down
10 changes: 3 additions & 7 deletions include/concurrencpp/results/make_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@ namespace concurrencpp {
static_assert(std::is_constructible_v<type, argument_types...> || std::is_same_v<type, void>,
"concurrencpp::make_ready_result - <<type>> is not constructible from <<argument_types...>");

static_assert(std::is_same_v<type, void> ? (sizeof...(argument_types) == 0) : true,
"concurrencpp::make_ready_result<void> - this overload does not accept any argument.");

auto result_state_ptr = std::make_shared<details::result_state<type>>();
result_state_ptr->set_result(std::forward<argument_types>(arguments)...);
result_state_ptr->publish_result();
return {std::move(result_state_ptr)};
}

inline result<void> make_ready_result() {
auto result_state_ptr = std::make_shared<details::result_state<void>>();
result_state_ptr->set_result();
result_state_ptr->publish_result();
return {std::move(result_state_ptr)};
}

template<class type>
result<type> make_exceptional_result(std::exception_ptr exception_ptr) {
if (!static_cast<bool>(exception_ptr)) {
Expand Down
1 change: 0 additions & 1 deletion include/concurrencpp/utils/bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ namespace concurrencpp::details {
tuple = std::make_tuple(std::forward<argument_types>(arguments)...)]() mutable noexcept(inti) -> decltype(auto) {
try {
return std::apply(callable, tuple);

} catch (...) {
// do nothing
}
Expand Down
40 changes: 31 additions & 9 deletions source/executors/thread_pool_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void idle_worker_set::find_idle_workers(size_t caller_index, std::vector<size_t>

thread_pool_worker::thread_pool_worker(thread_pool_executor& parent_pool, size_t index, size_t pool_size, std::chrono::milliseconds max_idle_time) :
m_atomic_abort(false), m_parent_pool(parent_pool), m_index(index), m_pool_size(pool_size), m_max_idle_time(max_idle_time),
m_worker_name(details::make_executor_worker_name(parent_pool.name)), m_semaphore(0), m_idle(true), m_abort(false) {
m_worker_name(details::make_executor_worker_name(parent_pool.name)), m_semaphore(0), m_idle(true), m_abort(false), m_event_found(false) {
m_idle_worker_list.reserve(pool_size);
}

Expand Down Expand Up @@ -175,7 +175,7 @@ void thread_pool_worker::balance_work() {
assert(donation_begin_it < m_private_queue.end());
assert(donation_end_it < m_private_queue.end());

m_parent_pool.worker_at(idle_worker_index).enqueue_foreign(m_private_queue, donation_begin_it, donation_end_it);
m_parent_pool.worker_at(idle_worker_index).enqueue_foreign(donation_begin_it, donation_end_it);

begin = end;
end += donation_count;
Expand Down Expand Up @@ -222,6 +222,10 @@ bool thread_pool_worker::wait_for_task(std::unique_lock<std::mutex>& lock) noexc
}
}

if (!m_event_found.load(std::memory_order_relaxed)) {
continue;
}

lock.lock();
if (m_public_queue.empty() && !m_abort) {
lock.unlock();
Expand Down Expand Up @@ -282,6 +286,8 @@ bool thread_pool_worker::drain_queue() {
assert(lock.owns_lock());
assert(!m_public_queue.empty() || m_abort);

m_event_found.store(false, std::memory_order_relaxed);

if (m_abort) {
m_idle = true;
return false;
Expand Down Expand Up @@ -337,6 +343,8 @@ void thread_pool_worker::enqueue_foreign(concurrencpp::task& task) {
throw_executor_shutdown_exception(m_parent_pool.name);
}

m_event_found.store(true, std::memory_order_relaxed);

const auto is_empty = m_public_queue.empty();
m_public_queue.emplace_back(std::move(task));
ensure_worker_active(is_empty, lock);
Expand All @@ -348,20 +356,21 @@ void thread_pool_worker::enqueue_foreign(std::span<concurrencpp::task> tasks) {
throw_executor_shutdown_exception(m_parent_pool.name);
}

m_event_found.store(true, std::memory_order_relaxed);

const auto is_empty = m_public_queue.empty();
m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
ensure_worker_active(is_empty, lock);
}

void thread_pool_worker::enqueue_foreign(const std::deque<concurrencpp::task>& deque, std::deque<task>::iterator begin, std::deque<task>::iterator end) {
assert(begin != end);
void thread_pool_worker::enqueue_foreign(std::deque<task>::iterator begin, std::deque<task>::iterator end) {
std::unique_lock<std::mutex> lock(m_lock);
assert(&deque != &m_public_queue);

if (m_abort) {
throw_executor_shutdown_exception(m_parent_pool.name);
}

m_event_found.store(true, std::memory_order_relaxed);

const auto is_empty = m_public_queue.empty();
m_public_queue.insert(m_public_queue.end(), std::make_move_iterator(begin), std::make_move_iterator(end));
ensure_worker_active(is_empty, lock);
Expand Down Expand Up @@ -392,6 +401,8 @@ void thread_pool_worker::shutdown() noexcept {
m_abort = true;
}

m_event_found.store(true, std::memory_order_release); // make sure the store is finished before notifying the worker.

m_semaphore.release();

if (m_thread.joinable()) {
Expand All @@ -415,6 +426,10 @@ std::chrono::milliseconds thread_pool_worker::max_worker_idle_time() const noexc
return m_max_idle_time;
}

bool thread_pool_worker::appears_empty() const noexcept {
return m_private_queue.empty() && !m_event_found.load(std::memory_order_relaxed);
}

thread_pool_executor::thread_pool_executor(std::string_view pool_name, size_t pool_size, std::chrono::milliseconds max_idle_time) :
derivable_executor<concurrencpp::thread_pool_executor>(pool_name), m_round_robin_cursor(0), m_idle_workers(pool_size), m_abort(false) {
m_workers.reserve(pool_size);
Expand Down Expand Up @@ -448,13 +463,20 @@ void thread_pool_executor::mark_worker_active(size_t index) noexcept {
}

void thread_pool_executor::enqueue(concurrencpp::task task) {
const auto idle_worker_pos = m_idle_workers.find_idle_worker(details::s_tl_thread_pool_data.this_thread_index);
const auto this_worker = details::s_tl_thread_pool_data.this_worker;
const auto this_worker_index = details::s_tl_thread_pool_data.this_thread_index;

if (this_worker != nullptr && this_worker->appears_empty()) {
return this_worker->enqueue_local(task);
}

const auto idle_worker_pos = m_idle_workers.find_idle_worker(this_worker_index);
if (idle_worker_pos != static_cast<size_t>(-1)) {
return m_workers[idle_worker_pos].enqueue_foreign(task);
}

if (details::s_tl_thread_pool_data.this_worker != nullptr) {
return details::s_tl_thread_pool_data.this_worker->enqueue_local(task);
if (this_worker != nullptr) {
return this_worker->enqueue_local(task);
}

const auto next_worker = m_round_robin_cursor.fetch_add(1, std::memory_order_relaxed) % m_workers.size();
Expand Down
3 changes: 3 additions & 0 deletions test/source/tests/coroutine_tests/coroutines_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ result<void> concurrencpp::tests::test_combo_coroutine_impl(std::shared_ptr<thre

assert_equal(string_result, std::to_string(result_factory<int>::get()));

co_await ex->submit([] {
});

auto& int_ref_result = co_await ex->submit([]() -> int& {
return result_factory<int&>::get();
});
Expand Down
1 change: 0 additions & 1 deletion test/source/tests/executor_tests/manual_executor_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ namespace concurrencpp::tests {
assert_equal(execution_map.size(), static_cast<size_t>(1)); // only one thread executed the tasks
assert_equal(execution_map.begin()->first, concurrencpp::details::thread::get_current_virtual_id()); // and it's this thread.
}

} // namespace concurrencpp::tests

using namespace std::chrono;
Expand Down
Loading