diff --git a/README.md b/README.md index 18e46d65..65227697 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + ## concurrencpp, the C++ concurrency library (note: documentation is still under development) @@ -17,6 +18,93 @@ concurrencpp is task-centric. A task is an asynchronous operation. Tasks provide from one to another, where the result of one task is used as if it were a parameter or an intermediate value of another ongoing task. In concurrencpp, the concept of tasks is represented by coroutines. This allows tasks to be suspended, waiting for other tasks to finish, and chained using `co_await`, and thus solving the consumer-producer problem elegantly by giving the concurrent code a synchronous look. +concurrencpp usage is build around the RAII concept. In order to use tasks and executors, applications create a `runtime` instance in the beginning of the `main` function. The runtime is then used to acquire existing executors and register new user defined executors. +Executors are used to schedule new tasks to run, and they might return a `result` object that can be used to marshal the asynchronous result to another task that acts as its consumer. +Results can be awaited and resolved in a non blocking manner, and even switch the underlying executor in the process. + When the runtime is destroyed, it iterates over every stored executor and calls its `shutdown` method. every executor then exits gracefully. Unscheduled tasks are destroyed, and attempts to create new tasks will throw an exception. + +**"Hello world" program using concurrencpp** + +```cpp +#include "concurrencpp.h" +#include + +int main() { + concurrencpp::runtime runtime; + auto result = runtime.thread_executor()->submit([] { + std::cout << "hello world" << std::endl; + }); + + result.get(); + return 0; +} +``` +In this basic example, we created a runtime object, then we acquired the thread executor from the runtime. We used `submit` to pass a lambda as our given callable. This lambda returns `void`, hence, the executor returns a `result` object that marshals the asynchronous result back to the caller. `main` calls `get` which blocks the main thread until the result becomes ready. If no exception was thrown, `get` returns `void`. If an exception was thrown, `get` re-throws it. Asynchronously, `thread_executor` launches a new thread of execution and runs the given lambda. It implicitly `co_return void` and the task is finished. `main` is then unblocked. + +**Concurrent even-number counting** + +```cpp +#include "concurrencpp.h" + +#include +#include +#include + +#include + +using namespace concurrencpp; + +std::vector make_random_vector() { + std::vector vec(64 * 1'024); + + std::srand(std::time(nullptr)); + for (auto& i : vec) { + i = ::rand(); + } + + return vec; +} + +result count_even(std::shared_ptr tpe, const std::vector& vector) { + const auto vecor_size = vector.size(); + const auto concurrency_level = tpe->max_concurrency_level(); + const auto chunk_size = vecor_size / concurrency_level; + + std::vector> chunk_count; + + for (auto i = 0; i < concurrency_level; i++) { + const auto chunk_begin = i * chunk_size; + const auto chunk_end = chunk_begin + chunk_size; + auto result = tpe->submit([&vector, chunk_begin, chunk_end]() -> size_t { + return std::count_if( + vector.begin() + chunk_begin, + vector.begin() + chunk_end, + [](auto i) { return i % 2 == 0; }); + }); + + chunk_count.emplace_back(std::move(result)); + } + + size_t total_count = 0; + + for(auto& result : chunk_count) { + total_count += co_await result; + } + + std::cout << "there are " << total_count << " even numbers in the vector" << std::endl; +} + +int main() { + concurrencpp::runtime runtime; + const auto vector = make_random_vector(); + auto result = count_even(runtime.thread_pool_executor(), vector); + result.get(); + return 0; +} +``` + +In this example, we again start the program by creating a runtime object. We create a vector filled with random numbers, then we acquire the `thread_pool_executor` from the runtime and call `count_even`. `count_even` is a coroutine (task) that spawns more coroutines and `co_await`s for them to finish inside. `max_concurrency_level` returns the maximum amount of workers that the executor supports, In the threadpool executor case, the number of worker is calculated from the number of cores. We then partitian the array to match the number of workers and send every chunk to be processed in its own task. Asynchronously, the workers count how many even numbers each chunk contains, and `co_return` the result. `count_even` iterates every result, pulling the asynchronous count by using `co_await`. The final result is then printed and `count_even` finishes. `main` which was blocked by calling `get` is unblocked and the program terminates gracefully. + **Executors** In the context of concurrencpp, an executor is an object that is able to schedule and run coroutines. @@ -120,6 +208,17 @@ This executor is good for long running tasks, like objects that run a work loop, * **inline executor** - mainly used to override the behavior of other executors. Enqueuing a task is equivalent to invoking it inline. +***Using executors:*** + +The bare mechanism of an executor is encapsulated in its `enqueue` method. This method enqueues a suspended coroutine for execution and has two flavors: one flavor that receives a single `coroutine_handle<>` as an argument, and another that receives a `span>`. The second flavor is used to enqueue a batch of suspended coroutines. This allows better scheduling heuristics and decreased contention. + +Of course, Applications don't need to use these low-level methods by themselves. `concurrencpp::executor` provides an API for scheduling non-coroutines by converting them to a suspended coroutine first and then scheduling them to run. +Applications can request executors to return a result object that marshals the asynchronous result of the provided callable. This is done by calling `executor::submit` and `execuor::bulk_submit`. +`submit` gets a callable, and returns a result object. `executor::bulk_submit` gets a `span` of callables and returns a `vector`of result objects in a similar way `submit` works. +In many cases, applications are not interested in the asynchronous value or exception. In this case, applications can use `executor:::post` and `executor::bulk_post` to schedule a callable or a `span` of callables to execute, but also tells the task to drop any returned value or thrown exception. Not marshaling the asynchronous result is faster than marshaling, but then we have no way of knowing the status of the ongoing task. + +`post`, `bulk_post`, `submit` and `bulk_submit` use `enqueue` behind the scenes for the underlying scheduling mechanism. + **Result objects** Asynchronous values and exceptions can be consumed using the concurrencpp result objects. @@ -129,7 +228,11 @@ In either case, this asynchronous result is marshaled to the consumer of the res The result state therefore, very from `idle` (the asynchronous result or exception aren't ready yet) to `value` (the coroutine terminated by returning a valid value) to `exception` (the coroutine terminated by throwing an exception). Result objects can be polled for their state, waited, resolved or awaited. Awaiting a result object by using `co_await` (and by doing so, turning the current function into a coroutine as well) is the preferred way of consuming result objects. -concurrencpp also provide the `null_result` type. in this case, any returned value or thrown exception will be dropped. + +Result objects are a move-only type, and as such, they cannot be used after their content was moved to another result object. In this case, the result object is considered to be "empty" and attempts to call any method other than `operator bool` and `operator = ` will throw. +After the asynchronous result has been pulled out of the result object (by calling `get`, `await` or `await_via`), the result object becomes empty. Emptiness can be tested with `operator bool`. + +concurrencpp also provide the `null_result` type. This type can be used as a return type from a coroutine. In this case, any returned value or thrown exception will be dropped. It's logically equivalent of returning `void`. `concurrencpp::result` API @@ -243,7 +346,12 @@ class result{ }; ``` + **Parallel coroutines** + +Using executors with OOP style is great. We can launch new tasks by posting or submitting them to an executor object. In some cases, such as in parallel algorithms, recursive algorithms and concurrent algorithms that use the fork-join model, this style can cause inconvenience. +Instead of returning strange result types like `result>` and doing functional style `join`/`unwrap`, we can just use parallel coroutines: + In concurrencpp, a parallel coroutine is a function that: 1. Returns any of `concurrencpp::result` / `concurrencpp::null_result` . @@ -252,16 +360,58 @@ In concurrencpp, a parallel coroutine is a function that: 1. Contains any of `co_await` or `co_return` in its body. In this case, the function is a parallel coroutine: -concurrencpp will start the coroutine suspended and immediately re-schedule it to run in the provided executor. +concurrencpp will start the function suspended and immediately re-schedule it to run in the provided executor. +`concurrencpp::executor_tag` is a dummy placeholder to tell the concurrencpp runtime that this function is not a regular function, it needs to start running inside the given executor. Applications can then consume the result of the parallel coroutine by using the returned result object. -This technique is especially good for parallel algorithms that require recursion, or applications that use the fork-join concurrency model. + +***Parallel coroutine example : parallel Fibonacci:*** +```cpp +#include "concurrencpp.h" +#include + +using namespace concurrencpp; + +int fibbonacci_sync(int i) { + if (i == 0) { + return 0; + } + + if (i == 1) { + return 1; + } + + return fibbonacci_sync(i - 1) + fibbonacci_sync(i - 2); +} + +result fibbonacci(executor_tag, std::shared_ptr tpe, const int curr) { + if (curr <= 10) { + co_return fibbonacci_sync(curr); + } + + auto fib_1 = fibbonacci({}, tpe, curr - 1); + auto fib_2 = fibbonacci({}, tpe, curr - 2); + + co_return co_await fib_1 + co_await fib_2; +} + +int main() { + concurrencpp::runtime runtime; + auto fibb_30 = fibonacci({}, runtime.thread_pool_executor(), 30).get(); + std::cout << "fibonacci(30) = " << fibb_30 << std::endl; + return 0; +} +``` +In this example, we calculate the 30-th member of the Fibonacci sequence in a parallel manner. +We start launching each Fibonacci step in it's own parallel coroutine. the first argument is a dummy `executor_tag` and the second argument is a threadpool executor. +Every recursive step invokes a new parallel coroutine that runs in parallel. the result is returned to the parent task and is acquired by using `co_await`. +When we deem the input to be small enough to be calculated synchronously (when `curr <= 10`), we stop executing each recursive step in its own task and just solve the algorithm synchronously. **Creating and scheduling coroutines** There are few ways to create and schedule coroutines in concurrencpp - By calling any of `executor::post`, `executor::submit`, `executor::bulk_post` or `executor::bulk_submit`. -In this case the application provides a callable and depending on the selected method, might return a result object to the caller. The callable is then turned into a coroutine and scheduled to run in the executor. +In this case the application provides a callable and depending Aon the selected method, might return a result object to the caller. The callable is then turned into a coroutine and scheduled to run in the executor. - By using a parallel coroutine. @@ -342,9 +492,9 @@ class timer_queue { Creates a new delay object where *this is associated timer_queue */ result make_delay_object(size_t due_time, std::shared_ptr executor); - }; +}; - class timer { +class timer { /* Creates an empty timer that does not run. */ @@ -416,7 +566,8 @@ class timer_queue { The concurrencpp runtime object is the glue that sticks all the components above to a complete and cohesive mechanism of managing asynchronous actions. The concurrencpp runtime object is the agent used to acquire, store and create new executors. The runtime must be created as a value type as soon as the main function starts to run. -When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling `executor::shutdown`. Executors then exit their inner work loop and any subsequent attempt to schedule a new coroutine will throw a `concurrencpp::executor_shutdown` exception. The runtime also contains the global timer queue used to create timers and delay objects. +When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling `executor::shutdown`. Executors then exit their inner work loop and any subsequent attempt to schedule a new task will throw a `concurrencpp::executor_shutdown` exception. The runtime also contains the global timer queue used to create timers and delay objects. Upon destruction, stored executors will wait for ongoing tasks to finish. If an ongoing task tries to use an executor to spawn new tasks or schedule its continuation - an exception will be thrown. In this case, ongoing tasks needs to quit as soon as possible, allowing their underlying executors to quit. With this RAII style of code, no tasks can be processed before the creation of the runtime object, and while/after the runtime gets out of scope. +This frees concurrent applications from needing to communicate termination message explicitly. Tasks are free use executors as long as the runtime object is alive. `concurrencpp::runtime` API diff --git a/concurrencpp/src/executors/constants.h b/concurrencpp/src/executors/constants.h index 4e754b01..e033efde 100644 --- a/concurrencpp/src/executors/constants.h +++ b/concurrencpp/src/executors/constants.h @@ -12,6 +12,7 @@ namespace concurrencpp::details::consts { inline const char* k_thread_pool_executor_name = "concurrencpp::thread_pool_executor"; inline const char* k_background_executor_name = "concurrencpp::background_executor"; + constexpr int k_worker_thread_max_concurrency_level = 1; inline const char* k_worker_thread_executor_name = "concurrencpp::worker_thread_executor"; inline const char* k_manual_executor_name = "concurrencpp::manual_executor"; diff --git a/concurrencpp/src/executors/manual_executor.cpp b/concurrencpp/src/executors/manual_executor.cpp index 3dfdbfc6..6f242b6b 100644 --- a/concurrencpp/src/executors/manual_executor.cpp +++ b/concurrencpp/src/executors/manual_executor.cpp @@ -3,10 +3,14 @@ using concurrencpp::manual_executor; -void manual_executor::destroy_tasks() noexcept { +void manual_executor::destroy_tasks(std::unique_lock& lock) noexcept { + assert(lock.owns_lock()); + for (auto task : m_tasks) { task.destroy(); } + + m_tasks.clear(); } void manual_executor::enqueue(std::experimental::coroutine_handle<> task) { @@ -146,13 +150,17 @@ bool manual_executor::wait_for_task(std::chrono::milliseconds max_waiting_time) } void manual_executor::shutdown() noexcept { - m_atomic_abort.store(true, std::memory_order_relaxed); + const auto abort = m_atomic_abort.exchange(true, std::memory_order_relaxed); + if (abort) { + //shutdown had been called before. + return; + } { std::unique_lock lock(m_lock); m_abort = true; - destroy_tasks(); + destroy_tasks(lock); } m_condition.notify_all(); diff --git a/concurrencpp/src/executors/manual_executor.h b/concurrencpp/src/executors/manual_executor.h index f2542b40..7a1dc9d0 100644 --- a/concurrencpp/src/executors/manual_executor.h +++ b/concurrencpp/src/executors/manual_executor.h @@ -16,7 +16,7 @@ namespace concurrencpp { bool m_abort; std::atomic_bool m_atomic_abort; - void destroy_tasks() noexcept; + void destroy_tasks(std::unique_lock& lock) noexcept; public: manual_executor() : diff --git a/concurrencpp/src/executors/thread_pool_executor.cpp b/concurrencpp/src/executors/thread_pool_executor.cpp index 0ed6ae1c..57bd2d96 100644 --- a/concurrencpp/src/executors/thread_pool_executor.cpp +++ b/concurrencpp/src/executors/thread_pool_executor.cpp @@ -268,20 +268,18 @@ void thread_pool_worker::work_loop() noexcept { } void thread_pool_worker::destroy_tasks() noexcept { + std::unique_lock lock(m_lock); for (auto task : m_private_queue) { task.destroy(); } m_private_queue.clear(); - { - std::unique_lock lock(m_lock); - m_private_queue = std::move(m_public_queue); - } - - for (auto task : m_private_queue) { + for (auto task : m_public_queue) { task.destroy(); } + + m_public_queue.clear(); } void thread_pool_worker::ensure_worker_active(std::unique_lock& lock) { @@ -353,6 +351,7 @@ void thread_pool_worker::enqueue_local(std::span lock(m_lock); for (auto task : m_private_queue) { task.destroy(); } m_private_queue.clear(); - { - std::unique_lock lock(m_lock); - m_private_queue = std::move(m_public_queue); - } - - for (auto task : m_private_queue) { + for (auto task : m_public_queue) { task.destroy(); } + + m_public_queue.clear(); } bool worker_thread_executor::drain_queue_impl() { @@ -153,7 +151,7 @@ void worker_thread_executor::enqueue(std::spanshutdown(); assert_true(executor->shutdown_requested()); + //it's ok to shut down an executor more than once + executor->shutdown(); + assert_throws([executor] { executor->enqueue(std::experimental::coroutine_handle{}); }); @@ -58,9 +71,9 @@ void concurrencpp::tests::test_inline_executor_max_concurrency_level() { concurrencpp::details::consts::k_inline_executor_max_concurrency_level); } -void concurrencpp::tests::test_inline_executor_post() { +void concurrencpp::tests::test_inline_executor_post_foreign() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -71,15 +84,41 @@ void concurrencpp::tests::test_inline_executor_post() { assert_equal(observer.get_execution_count(), task_count); assert_equal(observer.get_destruction_count(), task_count); - const auto execution_map = observer.get_execution_map(); + const auto& execution_map = observer.get_execution_map(); assert_equal(execution_map.size(), size_t(1)); assert_equal(execution_map.begin()->first , thread::get_current_virtual_id()); } -void concurrencpp::tests::test_inline_executor_submit() { +void concurrencpp::tests::test_inline_executor_post_inline() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count] { + for (size_t i = 0; i < task_count; i++) { + executor->post(observer.get_testing_stub()); + } + }); + + assert_equal(observer.get_execution_count(), task_count); + assert_equal(observer.get_destruction_count(), task_count); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); +} + +void concurrencpp::tests::test_inline_executor_post() { + test_inline_executor_post_inline(); + test_inline_executor_post_foreign(); +} + +void concurrencpp::tests::test_inline_executor_submit_foreign() { + object_observer observer; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -93,7 +132,7 @@ void concurrencpp::tests::test_inline_executor_submit() { assert_equal(observer.get_execution_count(), task_count); assert_equal(observer.get_destruction_count(), task_count); - const auto execution_map = observer.get_execution_map(); + const auto& execution_map = observer.get_execution_map(); assert_equal(execution_map.size(), size_t(1)); assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); @@ -103,9 +142,44 @@ void concurrencpp::tests::test_inline_executor_submit() { } } -void concurrencpp::tests::test_inline_executor_bulk_post(){ +void concurrencpp::tests::test_inline_executor_submit_inline() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector> results; + results.resize(task_count); + for (size_t i = 0; i < task_count; i++) { + results[i] = executor->submit(observer.get_testing_stub(i)); + } + + return results; + }); + + assert_equal(observer.get_execution_count(), task_count); + assert_equal(observer.get_destruction_count(), task_count); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), size_t(i)); + } +} + +void concurrencpp::tests::test_inline_executor_submit() { + test_inline_executor_submit_inline(); + test_inline_executor_submit_foreign(); +} + +void concurrencpp::tests::test_inline_executor_bulk_post_foreign(){ + object_observer observer; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -116,20 +190,51 @@ void concurrencpp::tests::test_inline_executor_bulk_post(){ stubs.emplace_back(observer.get_testing_stub()); } - executor->template bulk_post(stubs); + executor->bulk_post(stubs); assert_equal(observer.get_execution_count(), task_count); assert_equal(observer.get_destruction_count(), task_count); - const auto execution_map = observer.get_execution_map(); + const auto& execution_map = observer.get_execution_map(); assert_equal(execution_map.size(), size_t(1)); assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); } -void concurrencpp::tests::test_inline_executor_bulk_submit(){ +void concurrencpp::tests::test_inline_executor_bulk_post_inline() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + executor->post([executor, task_count, &observer] () mutable { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub()); + } + + executor->bulk_post(stubs); + }); + + assert_equal(observer.get_execution_count(), task_count); + assert_equal(observer.get_destruction_count(), task_count); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); +} + +void concurrencpp::tests::test_inline_executor_bulk_post() { + test_inline_executor_bulk_post_foreign(); + test_inline_executor_bulk_post_inline(); +} + +void concurrencpp::tests::test_inline_executor_bulk_submit_foreign(){ + object_observer observer; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -140,21 +245,57 @@ void concurrencpp::tests::test_inline_executor_bulk_submit(){ stubs.emplace_back(observer.get_testing_stub(i)); } - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); + + assert_equal(observer.get_execution_count(), task_count); + assert_equal(observer.get_destruction_count(), task_count); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); + + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), i); + } +} + +void concurrencpp::tests::test_inline_executor_bulk_submit_inline() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub(i)); + } + + return executor->bulk_submit(stubs); + }); assert_equal(observer.get_execution_count(), task_count); assert_equal(observer.get_destruction_count(), task_count); - const auto execution_map = observer.get_execution_map(); + const auto& execution_map = observer.get_execution_map(); assert_equal(execution_map.size(), size_t(1)); assert_equal(execution_map.begin()->first, thread::get_current_virtual_id()); + auto results = results_res.get(); for (size_t i = 0; i < task_count; i++) { assert_equal(results[i].get(), i); } } +void concurrencpp::tests::test_inline_executor_bulk_submit() { + test_inline_executor_bulk_submit_foreign(); + test_inline_executor_bulk_submit_inline(); +} + void concurrencpp::tests::test_inline_executor() { tester tester("inline_executor test"); diff --git a/tests/tests/executor_tests/manual_executor_tests.cpp b/tests/tests/executor_tests/manual_executor_tests.cpp index 11223528..bfe16e23 100644 --- a/tests/tests/executor_tests/manual_executor_tests.cpp +++ b/tests/tests/executor_tests/manual_executor_tests.cpp @@ -11,16 +11,27 @@ namespace concurrencpp::tests { void test_manual_executor_name(); - void test_manual_executor_shutdown_access_methods(); + void test_manual_executor_shutdown_method_access(); void test_manual_executor_shutdown_coro_raii(); + void test_manual_executor_shutdown_more_than_once(); void test_manual_executor_shutdown(); void test_manual_executor_max_concurrency_level(); + void test_manual_executor_post_foreign(); + void test_manual_executor_post_inline(); void test_manual_executor_post(); + + void test_manual_executor_submit_foreign(); + void test_manual_executor_submit_inline(); void test_manual_executor_submit(); + void test_manual_executor_bulk_post_foreign(); + void test_manual_executor_bulk_post_inline(); void test_manual_executor_bulk_post(); + + void test_manual_executor_bulk_submit_foreign(); + void test_manual_executor_bulk_submit_inline(); void test_manual_executor_bulk_submit(); void test_manual_executor_loop_once(); @@ -39,7 +50,7 @@ void concurrencpp::tests::test_manual_executor_name() { assert_equal(executor->name, concurrencpp::details::consts::k_manual_executor_name); } -void concurrencpp::tests::test_manual_executor_shutdown_access_methods() { +void concurrencpp::tests::test_manual_executor_shutdown_method_access() { auto executor = std::make_shared(); assert_false(executor->shutdown_requested()); @@ -85,7 +96,7 @@ void concurrencpp::tests::test_manual_executor_shutdown_coro_raii() { stubs.emplace_back(observer.get_testing_stub(i)); } - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); executor->shutdown(); assert_true(executor->shutdown_requested()); @@ -100,9 +111,23 @@ void concurrencpp::tests::test_manual_executor_shutdown_coro_raii() { } } +void concurrencpp::tests::test_manual_executor_shutdown_more_than_once() { + const size_t task_count = 64; + auto executor = std::make_shared(); + + for (size_t i = 0 ; i < task_count ; i++) { + executor->post([] {}); + } + + for (size_t i = 0; i < 4; i++) { + executor->shutdown(); + } +} + void concurrencpp::tests::test_manual_executor_shutdown() { - test_manual_executor_shutdown_access_methods(); + test_manual_executor_shutdown_method_access(); test_manual_executor_shutdown_coro_raii(); + test_manual_executor_shutdown_more_than_once(); } void concurrencpp::tests::test_manual_executor_max_concurrency_level() { @@ -113,9 +138,9 @@ void concurrencpp::tests::test_manual_executor_max_concurrency_level() { concurrencpp::details::consts::k_manual_executor_max_concurrency_level); } -void concurrencpp::tests::test_manual_executor_post() { +void concurrencpp::tests::test_manual_executor_post_foreign() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; auto executor = std::make_shared(); assert_equal(executor->size(), size_t(0)); @@ -133,7 +158,37 @@ void concurrencpp::tests::test_manual_executor_post() { for (size_t i = 0; i < task_count / 2; i++) { assert_true(executor->loop_once()); - assert_equal(observer.get_execution_count(), i + 1); + assert_equal(observer.get_execution_count(), i + 1); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); +} + +void concurrencpp::tests::test_manual_executor_post_inline() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + + assert_equal(executor->size(), size_t(0)); + assert_true(executor->empty()); + + executor->post([executor, &observer, task_count] { + for (size_t i = 0; i < task_count; i++) { + executor->post(observer.get_testing_stub()); + assert_equal(executor->size(), 1 + i); + assert_false(executor->empty()); + } + }); + + assert_true(executor->loop_once()); + + assert_equal(observer.get_execution_count(), size_t(0)); + assert_equal(observer.get_destruction_count(), size_t(0)); + + for (size_t i = 0; i < task_count / 2; i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); } executor->shutdown(); @@ -141,10 +196,15 @@ void concurrencpp::tests::test_manual_executor_post() { assert_equal(observer.get_destruction_count(), task_count); } -void concurrencpp::tests::test_manual_executor_submit() { +void concurrencpp::tests::test_manual_executor_post() { + test_manual_executor_post_foreign(); + test_manual_executor_post_inline(); +} + +void concurrencpp::tests::test_manual_executor_submit_foreign() { object_observer observer; - const size_t task_count = 1'000; - auto executor = std::make_shared(); + const size_t task_count = 1'024; + auto executor = std::make_shared(); assert_equal(executor->size(), size_t(0)); assert_true(executor->empty()); @@ -158,13 +218,60 @@ void concurrencpp::tests::test_manual_executor_submit() { assert_equal(observer.get_execution_count(), size_t(0)); assert_equal(observer.get_destruction_count(), size_t(0)); + + for (size_t i = 0 ; i < task_count / 2 ;i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); + assert_equal(results[i].get(), i); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); } -void concurrencpp::tests::test_manual_executor_bulk_post() { +void concurrencpp::tests::test_manual_executor_submit_inline() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + + assert_equal(executor->size(), size_t(0)); + assert_true(executor->empty()); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector> results; + results.resize(task_count); + for (size_t i = 0; i < task_count; i++) { + results[i] = executor->submit(observer.get_testing_stub(i)); + } + + return results; + }); + + assert_equal(observer.get_execution_count(), size_t(0)); + assert_equal(observer.get_destruction_count(), size_t(0)); + + assert_true(executor->loop_once()); + auto results = results_res.get(); + + for (size_t i = 0; i < task_count / 2; i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); + assert_equal(results[i].get(), i); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); +} + +void concurrencpp::tests::test_manual_executor_submit() { + test_manual_executor_submit_foreign(); + test_manual_executor_submit_inline(); +} + +void concurrencpp::tests::test_manual_executor_bulk_post_foreign() { object_observer observer; const size_t task_count = 1'000; auto executor = std::make_shared(); - executor_shutdowner shutdown(executor); std::vector stubs; stubs.reserve(task_count); @@ -173,21 +280,64 @@ void concurrencpp::tests::test_manual_executor_bulk_post() { stubs.emplace_back(observer.get_testing_stub()); } - executor->template bulk_post(stubs); + executor->bulk_post(stubs); assert_false(executor->empty()); assert_equal(executor->size(), task_count); assert_equal(observer.get_execution_count(), size_t(0)); assert_equal(observer.get_destruction_count(), size_t(0)); + + for (size_t i = 0; i < task_count / 2; i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); } -void concurrencpp::tests::test_manual_executor_bulk_submit() { +void concurrencpp::tests::test_manual_executor_bulk_post_inline() { object_observer observer; const size_t task_count = 1'000; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); + executor->post([executor, task_count, &observer]() mutable { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub()); + } + executor->bulk_post(stubs); + }); + + assert_true(executor->loop_once()); + + assert_equal(observer.get_execution_count(), size_t(0)); + assert_equal(observer.get_destruction_count(), size_t(0)); + + for (size_t i = 0; i < task_count / 2; i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); +} + +void concurrencpp::tests::test_manual_executor_bulk_post() { + test_manual_executor_bulk_post_foreign(); + test_manual_executor_bulk_post_inline(); +} + +void concurrencpp::tests::test_manual_executor_bulk_submit_foreign() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + std::vector stubs; stubs.reserve(task_count); @@ -195,18 +345,64 @@ void concurrencpp::tests::test_manual_executor_bulk_submit() { stubs.emplace_back(observer.get_testing_stub(i)); } - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); assert_false(executor->empty()); assert_equal(executor->size(), task_count); assert_equal(observer.get_execution_count(), size_t(0)); assert_equal(observer.get_destruction_count(), size_t(0)); + + for (size_t i = 0; i < task_count / 2; i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); +} + +void concurrencpp::tests::test_manual_executor_bulk_submit_inline() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub(i)); + } + + return executor->bulk_submit(stubs); + }); + + assert_true(executor->loop_once()); + + assert_equal(observer.get_execution_count(), size_t(0)); + assert_equal(observer.get_destruction_count(), size_t(0)); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count / 2; i++) { + assert_true(executor->loop_once()); + assert_equal(observer.get_execution_count(), i + 1); + assert_equal(results[i].get(), i); + } + + executor->shutdown(); + assert_equal(observer.get_destruction_count(), task_count); +} + +void concurrencpp::tests::test_manual_executor_bulk_submit() { + test_manual_executor_bulk_submit_foreign(); + test_manual_executor_bulk_submit_inline(); } void concurrencpp::tests::test_manual_executor_loop_once() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); diff --git a/tests/tests/executor_tests/thread_executor_tests.cpp b/tests/tests/executor_tests/thread_executor_tests.cpp index c04ed6d8..9375c422 100644 --- a/tests/tests/executor_tests/thread_executor_tests.cpp +++ b/tests/tests/executor_tests/thread_executor_tests.cpp @@ -12,16 +12,27 @@ namespace concurrencpp::tests { void test_thread_executor_name(); - void test_thread_executor_shutdown_enqueue(); void test_thread_executor_shutdown_join(); + void test_thread_executor_shutdown_method_access(); + void test_thread_executor_shutdown_more_than_once(); void test_thread_executor_shutdown(); void test_thread_executor_max_concurrency_level(); + void test_thread_executor_post_foreign(); + void test_thread_executor_post_inline(); void test_thread_executor_post(); + + void test_thread_executor_submit_foreign(); + void test_thread_executor_submit_inline(); void test_thread_executor_submit(); + void test_thread_executor_bulk_post_foreign(); + void test_thread_executor_bulk_post_inline(); void test_thread_executor_bulk_post(); + + void test_thread_executor_bulk_submit_foreign(); + void test_thread_executor_bulk_submit_inline(); void test_thread_executor_bulk_submit(); void assert_execution_threads( @@ -43,7 +54,7 @@ void concurrencpp::tests::test_thread_executor_name() { assert_equal(executor->name, concurrencpp::details::consts::k_thread_executor_name); } -void concurrencpp::tests::test_thread_executor_shutdown_enqueue() { +void concurrencpp::tests::test_thread_executor_shutdown_method_access() { auto executor = std::make_shared(); assert_false(executor->shutdown_requested()); @@ -78,9 +89,25 @@ void concurrencpp::tests::test_thread_executor_shutdown_join() { assert_equal(observer.get_destruction_count(), task_count); } +void concurrencpp::tests::test_thread_executor_shutdown_more_than_once() { + const size_t task_count = 64; + auto executor = std::make_shared(); + + for (size_t i = 0; i < task_count; i++) { + executor->post([] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }); + } + + for (size_t i = 0; i < 4; i++) { + executor->shutdown(); + } +} + void concurrencpp::tests::test_thread_executor_shutdown() { - test_thread_executor_shutdown_enqueue(); + test_thread_executor_shutdown_method_access(); test_thread_executor_shutdown_join(); + test_thread_executor_shutdown_more_than_once(); } void concurrencpp::tests::test_thread_executor_max_concurrency_level() { @@ -91,14 +118,14 @@ void concurrencpp::tests::test_thread_executor_max_concurrency_level() { concurrencpp::details::consts::k_thread_executor_max_concurrency_level); } -void concurrencpp::tests::test_thread_executor_post() { +void concurrencpp::tests::test_thread_executor_post_foreign() { object_observer observer; const size_t task_count = 128; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); for (size_t i = 0; i < task_count; i++) { - executor->post(observer.get_testing_stub(std::chrono::milliseconds(200))); + executor->post(observer.get_testing_stub()); } assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); @@ -107,7 +134,30 @@ void concurrencpp::tests::test_thread_executor_post() { assert_execution_threads(observer.get_execution_map(), task_count); } -void concurrencpp::tests::test_thread_executor_submit() { +void concurrencpp::tests::test_thread_executor_post_inline() { + object_observer observer; + const size_t task_count = 128; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count] { + for (size_t i = 0; i < task_count; i++) { + executor->post(observer.get_testing_stub()); + } + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + assert_execution_threads(observer.get_execution_map(), task_count); +} + +void concurrencpp::tests::test_thread_executor_post() { + test_thread_executor_post_foreign(); + test_thread_executor_post_inline(); +} + +void concurrencpp::tests::test_thread_executor_submit_foreign() { object_observer observer; const size_t task_count = 128; auto executor = std::make_shared(); @@ -117,7 +167,7 @@ void concurrencpp::tests::test_thread_executor_submit() { results.resize(task_count); for (size_t i = 0; i < task_count; i++) { - results[i] = executor->submit(observer.get_testing_stub(i, std::chrono::milliseconds(200))); + results[i] = executor->submit(observer.get_testing_stub(i)); } assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); @@ -130,7 +180,39 @@ void concurrencpp::tests::test_thread_executor_submit() { } } -void concurrencpp::tests::test_thread_executor_bulk_post() { +void concurrencpp::tests::test_thread_executor_submit_inline() { + object_observer observer; + const size_t task_count = 128; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector> results; + results.resize(task_count); + for (size_t i = 0; i < task_count; i++) { + results[i] = executor->submit(observer.get_testing_stub(i)); + } + + return results; + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + assert_execution_threads(observer.get_execution_map(), task_count); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), size_t(i)); + } +} + +void concurrencpp::tests::test_thread_executor_submit() { + test_thread_executor_submit_foreign(); + test_thread_executor_submit_inline(); +} + +void concurrencpp::tests::test_thread_executor_bulk_post_foreign() { object_observer observer; const size_t task_count = 128; auto executor = std::make_shared(); @@ -140,10 +222,10 @@ void concurrencpp::tests::test_thread_executor_bulk_post() { stubs.reserve(task_count); for (size_t i = 0; i < task_count; i++) { - stubs.emplace_back(observer.get_testing_stub(std::chrono::milliseconds(200))); + stubs.emplace_back(observer.get_testing_stub()); } - executor->template bulk_post(stubs); + executor->bulk_post(stubs); assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); @@ -151,7 +233,35 @@ void concurrencpp::tests::test_thread_executor_bulk_post() { assert_execution_threads(observer.get_execution_map(), task_count); } -void concurrencpp::tests::test_thread_executor_bulk_submit() { +void concurrencpp::tests::test_thread_executor_bulk_post_inline() { + object_observer observer; + const size_t task_count = 128; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count]() mutable { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub()); + } + + executor->bulk_post(stubs); + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + assert_execution_threads(observer.get_execution_map(), task_count); +} + +void concurrencpp::tests::test_thread_executor_bulk_post() { + test_thread_executor_bulk_post_foreign(); + test_thread_executor_bulk_post_inline(); +} + +void concurrencpp::tests::test_thread_executor_bulk_submit_foreign() { object_observer observer; const size_t task_count = 128; auto executor = std::make_shared(); @@ -161,10 +271,10 @@ void concurrencpp::tests::test_thread_executor_bulk_submit() { stubs.reserve(task_count); for (size_t i = 0; i < task_count; i++) { - stubs.emplace_back(observer.get_testing_stub(i, std::chrono::milliseconds(200))); + stubs.emplace_back(observer.get_testing_stub(i)); } - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); @@ -176,6 +286,39 @@ void concurrencpp::tests::test_thread_executor_bulk_submit() { } } +void concurrencpp::tests::test_thread_executor_bulk_submit_inline() { + object_observer observer; + const size_t task_count = 128; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub(i)); + } + + return executor->bulk_submit(stubs); + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + assert_execution_threads(observer.get_execution_map(), task_count); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), i); + } +} + +void concurrencpp::tests::test_thread_executor_bulk_submit() { + test_thread_executor_bulk_post_foreign(); + test_thread_executor_bulk_post_inline(); +} + void concurrencpp::tests::test_thread_executor() { tester tester("thread_executor test"); diff --git a/tests/tests/executor_tests/thread_pool_executor_tests.cpp b/tests/tests/executor_tests/thread_pool_executor_tests.cpp index 463668f4..5d91a207 100644 --- a/tests/tests/executor_tests/thread_pool_executor_tests.cpp +++ b/tests/tests/executor_tests/thread_pool_executor_tests.cpp @@ -14,13 +14,24 @@ namespace concurrencpp::tests { void test_thread_pool_executor_shutdown_coro_raii(); void test_thread_pool_executor_shutdown_thread_join(); - void test_thread_pool_executor_shutdown_enqueue(); + void test_thread_pool_executor_shutdown_method_access(); + void test_thread_pool_executor_shutdown_method_more_than_once(); void test_thread_pool_executor_shutdown(); + void test_thread_pool_executor_post_foreign(); + void test_thread_pool_executor_post_inline(); void test_thread_pool_executor_post(); + + void test_thread_pool_executor_submit_foreign(); + void test_thread_pool_executor_submit_inline(); void test_thread_pool_executor_submit(); + void test_thread_pool_executor_bulk_post_foreign(); + void test_thread_pool_executor_bulk_post_inline(); void test_thread_pool_executor_bulk_post(); + + void test_thread_pool_executor_bulk_submit_foreign(); + void test_thread_pool_executor_bulk_submit_inline(); void test_thread_pool_executor_bulk_submit(); void test_thread_pool_executor_enqueue_algorithm(); @@ -47,10 +58,10 @@ void concurrencpp::tests::test_thread_pool_executor_shutdown_coro_raii() { } executor->post([] { - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::seconds(1)); }); - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); executor->shutdown(); assert_true(executor->shutdown_requested()); @@ -74,19 +85,19 @@ void concurrencpp::tests::test_thread_pool_executor_shutdown_thread_join() { for (size_t i = 0; i < 3; i++) { executor->post([] { - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); }); } //allow threads time to start working - std::this_thread::sleep_for(std::chrono::milliseconds(150)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 1/3 of the threads are waiting, 1/3 are working, 1/3 are idle. all should be joined when tp is shut-down. executor->shutdown(); assert_true(executor->shutdown_requested()); } -void concurrencpp::tests::test_thread_pool_executor_shutdown_enqueue() { +void concurrencpp::tests::test_thread_pool_executor_shutdown_method_access() { auto executor = std::make_shared("threadpool", 4, std::chrono::seconds(10)); assert_false(executor->shutdown_requested()); @@ -104,15 +115,31 @@ void concurrencpp::tests::test_thread_pool_executor_shutdown_enqueue() { }); } +void concurrencpp::tests::test_thread_pool_executor_shutdown_method_more_than_once() { + const size_t task_count = 64; + auto executor = std::make_shared("threadpool", 4, std::chrono::seconds(10)); + + for (size_t i = 0; i < task_count; i++) { + executor->post([] { + std::this_thread::sleep_for(std::chrono::milliseconds(18)); + }); + } + + for (size_t i = 0; i < 4; i++) { + executor->shutdown(); + } +} + void concurrencpp::tests::test_thread_pool_executor_shutdown() { test_thread_pool_executor_shutdown_coro_raii(); test_thread_pool_executor_shutdown_thread_join(); - test_thread_pool_executor_shutdown_enqueue(); + test_thread_pool_executor_shutdown_method_access(); + test_thread_pool_executor_shutdown_method_more_than_once(); } -void concurrencpp::tests::test_thread_pool_executor_post() { +void concurrencpp::tests::test_thread_pool_executor_post_foreign() { object_observer observer; - const size_t task_count = 100'000; + const size_t task_count = 50'000; const size_t worker_count = 6; auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); executor_shutdowner shutdown(executor); @@ -127,9 +154,33 @@ void concurrencpp::tests::test_thread_pool_executor_post() { assert_equal(observer.get_execution_map().size(), worker_count); } -void concurrencpp::tests::test_thread_pool_executor_submit() { +void concurrencpp::tests::test_thread_pool_executor_post_inline() { object_observer observer; - const size_t task_count = 100'000; + const size_t task_count = 50'000; + const size_t worker_count = 6; + auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count] { + for (size_t i = 0; i < task_count; i++) { + executor->post(observer.get_testing_stub()); + } + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(2))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(2))); + + assert_equal(observer.get_execution_map().size(), worker_count); +} + +void concurrencpp::tests::test_thread_pool_executor_post() { + test_thread_pool_executor_post_foreign(); + test_thread_pool_executor_post_inline(); +} + +void concurrencpp::tests::test_thread_pool_executor_submit_foreign() { + object_observer observer; + const size_t task_count = 50'000; const size_t worker_count = 6; auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); executor_shutdowner shutdown(executor); @@ -151,9 +202,42 @@ void concurrencpp::tests::test_thread_pool_executor_submit() { } } -void concurrencpp::tests::test_thread_pool_executor_bulk_post() { +void concurrencpp::tests::test_thread_pool_executor_submit_inline(){ object_observer observer; - const size_t task_count = 40'000; + const size_t task_count = 50'000; + const size_t worker_count = 6; + auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector> results; + results.resize(task_count); + for (size_t i = 0; i < task_count; i++) { + results[i] = executor->submit(observer.get_testing_stub(i)); + } + + return results; + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(2))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(2))); + + assert_equal(observer.get_execution_map().size(), worker_count); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), size_t(i)); + } +} + +void concurrencpp::tests::test_thread_pool_executor_submit() { + test_thread_pool_executor_submit_foreign(); + test_thread_pool_executor_submit_inline(); +} + +void concurrencpp::tests::test_thread_pool_executor_bulk_post_foreign() { + object_observer observer; + const size_t task_count = 50'000; const size_t worker_count = 6; auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); executor_shutdowner shutdown(executor); @@ -165,7 +249,7 @@ void concurrencpp::tests::test_thread_pool_executor_bulk_post() { stubs.emplace_back(observer.get_testing_stub()); } - executor->template bulk_post(stubs); + executor->bulk_post(stubs); assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(2))); assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(2))); @@ -173,9 +257,37 @@ void concurrencpp::tests::test_thread_pool_executor_bulk_post() { assert_equal(observer.get_execution_map().size(), worker_count); } -void concurrencpp::tests::test_thread_pool_executor_bulk_submit() { +void concurrencpp::tests::test_thread_pool_executor_bulk_post_inline() { + object_observer observer; + const size_t task_count = 1'024; + const size_t worker_count = 6; + auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count]() mutable { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub()); + } + executor->bulk_post(stubs); + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(2))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(2))); + + assert_equal(observer.get_execution_map().size(), worker_count); +} + +void concurrencpp::tests::test_thread_pool_executor_bulk_post() { + test_thread_pool_executor_bulk_post_foreign(); + test_thread_pool_executor_bulk_post_inline(); +} + +void concurrencpp::tests::test_thread_pool_executor_bulk_submit_foreign() { object_observer observer; - const size_t task_count = 40'000; + const size_t task_count = 50'000; const size_t worker_count = 6; auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); executor_shutdowner shutdown(executor); @@ -187,7 +299,7 @@ void concurrencpp::tests::test_thread_pool_executor_bulk_submit() { stubs.emplace_back(observer.get_testing_stub(i)); } - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(2))); assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(2))); @@ -199,6 +311,40 @@ void concurrencpp::tests::test_thread_pool_executor_bulk_submit() { } } +void concurrencpp::tests::test_thread_pool_executor_bulk_submit_inline() { + object_observer observer; + const size_t task_count = 50'000; + const size_t worker_count = 6; + auto executor = std::make_shared("threadpool", worker_count, std::chrono::seconds(10)); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub(i)); + } + + return executor->bulk_submit(stubs); + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(2))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(2))); + + assert_equal(observer.get_execution_map().size(), worker_count); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), i); + } +} + +void concurrencpp::tests::test_thread_pool_executor_bulk_submit() { + test_thread_pool_executor_bulk_submit_foreign(); + test_thread_pool_executor_bulk_submit_inline(); +} + void concurrencpp::tests::test_thread_pool_executor_enqueue_algorithm() { //case 1 : if an idle thread exists, enqueue it to the idle thread { diff --git a/tests/tests/executor_tests/worker_thread_executor_tests.cpp b/tests/tests/executor_tests/worker_thread_executor_tests.cpp index bd5300a0..7bc6ef10 100644 --- a/tests/tests/executor_tests/worker_thread_executor_tests.cpp +++ b/tests/tests/executor_tests/worker_thread_executor_tests.cpp @@ -12,17 +12,28 @@ namespace concurrencpp::tests { void test_worker_thread_executor_name(); - void test_worker_thread_executor_shutdown_enqueue(); + void test_worker_thread_executor_shutdown_method_access(); void test_worker_thread_executor_shutdown_thread_join(); void test_worker_thread_executor_shutdown_coro_raii(); + void test_worker_thread_executor_shutdown_coro_more_than_once(); void test_worker_thread_executor_shutdown(); void test_worker_thread_executor_max_concurrency_level(); + void test_worker_thread_executor_post_foreign(); + void test_worker_thread_executor_post_inline(); void test_worker_thread_executor_post(); + + void test_worker_thread_executor_submit_foreign(); + void test_worker_thread_executor_submit_inline(); void test_worker_thread_executor_submit(); + void test_worker_thread_executor_bulk_post_foreign(); + void test_worker_thread_executor_bulk_post_inline(); void test_worker_thread_executor_bulk_post(); + + void test_worker_thread_executor_bulk_submit_foreign(); + void test_worker_thread_executor_bulk_submit_inline(); void test_worker_thread_executor_bulk_submit(); } @@ -35,7 +46,7 @@ void concurrencpp::tests::test_worker_thread_executor_name() { assert_equal(executor->name, concurrencpp::details::consts::k_worker_thread_executor_name); } -void concurrencpp::tests::test_worker_thread_executor_shutdown_enqueue() { +void concurrencpp::tests::test_worker_thread_executor_shutdown_method_access() { auto executor = std::make_shared(); assert_false(executor->shutdown_requested()); @@ -58,7 +69,7 @@ void concurrencpp::tests::test_worker_thread_executor_shutdown_thread_join() { auto executor = std::make_shared(); executor->post([] { - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); }); std::this_thread::sleep_for(std::chrono::milliseconds(150)); @@ -75,17 +86,18 @@ void concurrencpp::tests::test_worker_thread_executor_shutdown_coro_raii() { std::vector stubs; stubs.reserve(task_count); - for (size_t i = 0; i < 1'024; i++) { + for (size_t i = 0; i < task_count; i++) { stubs.emplace_back(observer.get_testing_stub(i)); } executor->post([] { - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); }); - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); executor->shutdown(); + assert_true(executor->shutdown_requested()); assert_equal(observer.get_execution_count(), size_t(0)); @@ -98,10 +110,26 @@ void concurrencpp::tests::test_worker_thread_executor_shutdown_coro_raii() { } } +void concurrencpp::tests::test_worker_thread_executor_shutdown_coro_more_than_once() { + const size_t task_count = 64; + auto executor = std::make_shared(); + + for (size_t i = 0; i < task_count; i++) { + executor->post([] { + std::this_thread::sleep_for(std::chrono::milliseconds(18)); + }); + } + + for (size_t i = 0; i < 4; i++) { + executor->shutdown(); + } +} + void concurrencpp::tests::test_worker_thread_executor_shutdown() { - test_worker_thread_executor_shutdown_enqueue(); + test_worker_thread_executor_shutdown_method_access(); test_worker_thread_executor_shutdown_coro_raii(); test_worker_thread_executor_shutdown_thread_join(); + test_worker_thread_executor_shutdown_coro_more_than_once(); } void concurrencpp::tests::test_worker_thread_executor_max_concurrency_level() { @@ -111,9 +139,9 @@ void concurrencpp::tests::test_worker_thread_executor_max_concurrency_level() { assert_equal(executor->max_concurrency_level(), 1); } -void concurrencpp::tests::test_worker_thread_executor_post() { +void concurrencpp::tests::test_worker_thread_executor_post_foreign() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -130,9 +158,35 @@ void concurrencpp::tests::test_worker_thread_executor_post() { assert_not_equal(execution_map.begin()->first, thread::get_current_virtual_id()); } -void concurrencpp::tests::test_worker_thread_executor_submit() { +void concurrencpp::tests::test_worker_thread_executor_post_inline() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count] { + for (size_t i = 0; i < task_count; i++) { + executor->post(observer.get_testing_stub()); + } + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_not_equal(execution_map.begin()->first, thread::get_current_virtual_id()); +} + +void concurrencpp::tests::test_worker_thread_executor_post() { + test_worker_thread_executor_post_foreign(); + test_worker_thread_executor_post_inline(); +} + +void concurrencpp::tests::test_worker_thread_executor_submit_foreign() { + object_observer observer; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -156,9 +210,44 @@ void concurrencpp::tests::test_worker_thread_executor_submit() { } } -void concurrencpp::tests::test_worker_thread_executor_bulk_post() { +void concurrencpp::tests::test_worker_thread_executor_submit_inline() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector> results; + results.resize(task_count); + for (size_t i = 0; i < task_count; i++) { + results[i] = executor->submit(observer.get_testing_stub(i)); + } + + return results; + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_not_equal(execution_map.begin()->first, thread::get_current_virtual_id()); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), size_t(i)); + } +} + +void concurrencpp::tests::test_worker_thread_executor_submit() { + test_worker_thread_executor_submit_foreign(); + test_worker_thread_executor_submit_inline(); +} + +void concurrencpp::tests::test_worker_thread_executor_bulk_post_foreign() { + object_observer observer; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -169,7 +258,7 @@ void concurrencpp::tests::test_worker_thread_executor_bulk_post() { stubs.emplace_back(observer.get_testing_stub()); } - executor->template bulk_post(stubs); + executor->bulk_post(stubs); assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); @@ -180,9 +269,40 @@ void concurrencpp::tests::test_worker_thread_executor_bulk_post() { assert_not_equal(execution_map.begin()->first, thread::get_current_virtual_id()); } -void concurrencpp::tests::test_worker_thread_executor_bulk_submit() { +void concurrencpp::tests::test_worker_thread_executor_bulk_post_inline() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + executor->post([executor, &observer, task_count]() mutable { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub()); + } + + executor->bulk_post(stubs); + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_not_equal(execution_map.begin()->first, thread::get_current_virtual_id()); +} + +void concurrencpp::tests::test_worker_thread_executor_bulk_post() { + test_worker_thread_executor_bulk_post_foreign(); + test_worker_thread_executor_bulk_post_inline(); +} + +void concurrencpp::tests::test_worker_thread_executor_bulk_submit_foreign() { object_observer observer; - const size_t task_count = 1'000; + const size_t task_count = 1'024; auto executor = std::make_shared(); executor_shutdowner shutdown(executor); @@ -193,7 +313,7 @@ void concurrencpp::tests::test_worker_thread_executor_bulk_submit() { stubs.emplace_back(observer.get_testing_stub(i)); } - auto results = executor->template bulk_submit(stubs); + auto results = executor->bulk_submit(stubs); assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); @@ -208,12 +328,48 @@ void concurrencpp::tests::test_worker_thread_executor_bulk_submit() { } } +void concurrencpp::tests::test_worker_thread_executor_bulk_submit_inline() { + object_observer observer; + const size_t task_count = 1'024; + auto executor = std::make_shared(); + executor_shutdowner shutdown(executor); + + auto results_res = executor->submit([executor, &observer, task_count] { + std::vector stubs; + stubs.reserve(task_count); + + for (size_t i = 0; i < task_count; i++) { + stubs.emplace_back(observer.get_testing_stub(i)); + } + + return executor->bulk_submit(stubs); + }); + + assert_true(observer.wait_execution_count(task_count, std::chrono::minutes(1))); + assert_true(observer.wait_destruction_count(task_count, std::chrono::minutes(1))); + + const auto& execution_map = observer.get_execution_map(); + + assert_equal(execution_map.size(), size_t(1)); + assert_not_equal(execution_map.begin()->first, thread::get_current_virtual_id()); + + auto results = results_res.get(); + for (size_t i = 0; i < task_count; i++) { + assert_equal(results[i].get(), i); + } +} + +void concurrencpp::tests::test_worker_thread_executor_bulk_submit() { + test_worker_thread_executor_bulk_submit_foreign(); + test_worker_thread_executor_bulk_submit_inline(); +} + void concurrencpp::tests::test_worker_thread_executor() { tester tester("worker_thread_executor test"); tester.add_step("name", test_worker_thread_executor_name); tester.add_step("shutdown", test_worker_thread_executor_shutdown); - tester.add_step("max_concurreny_level", test_worker_thread_executor_max_concurrency_level); + tester.add_step("max_concurrency_level", test_worker_thread_executor_max_concurrency_level); tester.add_step("post", test_worker_thread_executor_post); tester.add_step("submit", test_worker_thread_executor_submit); tester.add_step("bulk_post", test_worker_thread_executor_bulk_post);