Skip to content

Commit

Permalink
Merge pull request David-Haim#2 from David-Haim/v.0.0.2
Browse files Browse the repository at this point in the history
v 0.0.2
  • Loading branch information
David-Haim authored Aug 14, 2020
2 parents 633eda0 + f349b27 commit d98c5ef
Show file tree
Hide file tree
Showing 11 changed files with 1,053 additions and 109 deletions.
165 changes: 158 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

## concurrencpp, the C++ concurrency library

(note: documentation is still under development)
Expand All @@ -17,6 +18,93 @@ concurrencpp is task-centric. A task is an asynchronous operation. Tasks provide
from one to another, where the result of one task is used as if it were a parameter or an intermediate value of another ongoing task.
In concurrencpp, the concept of tasks is represented by coroutines. This allows tasks to be suspended, waiting for other tasks to finish, and chained using `co_await`, and thus solving the consumer-producer problem elegantly by giving the concurrent code a synchronous look.

concurrencpp usage is build around the RAII concept. In order to use tasks and executors, applications create a `runtime` instance in the beginning of the `main` function. The runtime is then used to acquire existing executors and register new user defined executors.
Executors are used to schedule new tasks to run, and they might return a `result` object that can be used to marshal the asynchronous result to another task that acts as its consumer.
Results can be awaited and resolved in a non blocking manner, and even switch the underlying executor in the process.
When the runtime is destroyed, it iterates over every stored executor and calls its `shutdown` method. every executor then exits gracefully. Unscheduled tasks are destroyed, and attempts to create new tasks will throw an exception.

**"Hello world" program using concurrencpp**

```cpp
#include "concurrencpp.h"
#include <iostream>

int main() {
concurrencpp::runtime runtime;
auto result = runtime.thread_executor()->submit([] {
std::cout << "hello world" << std::endl;
});

result.get();
return 0;
}
```
In this basic example, we created a runtime object, then we acquired the thread executor from the runtime. We used `submit` to pass a lambda as our given callable. This lambda returns `void`, hence, the executor returns a `result<void>` object that marshals the asynchronous result back to the caller. `main` calls `get` which blocks the main thread until the result becomes ready. If no exception was thrown, `get` returns `void`. If an exception was thrown, `get` re-throws it. Asynchronously, `thread_executor` launches a new thread of execution and runs the given lambda. It implicitly `co_return void` and the task is finished. `main` is then unblocked.

**Concurrent even-number counting**

```cpp
#include "concurrencpp.h"

#include <iostream>
#include <vector>
#include <algorithm>

#include <ctime>

using namespace concurrencpp;

std::vector<int> make_random_vector() {
std::vector<int> vec(64 * 1'024);

std::srand(std::time(nullptr));
for (auto& i : vec) {
i = ::rand();
}

return vec;
}

result<void> count_even(std::shared_ptr<thread_pool_executor> tpe, const std::vector<int>& vector) {
const auto vecor_size = vector.size();
const auto concurrency_level = tpe->max_concurrency_level();
const auto chunk_size = vecor_size / concurrency_level;

std::vector<result<size_t>> chunk_count;

for (auto i = 0; i < concurrency_level; i++) {
const auto chunk_begin = i * chunk_size;
const auto chunk_end = chunk_begin + chunk_size;
auto result = tpe->submit([&vector, chunk_begin, chunk_end]() -> size_t {
return std::count_if(
vector.begin() + chunk_begin,
vector.begin() + chunk_end,
[](auto i) { return i % 2 == 0; });
});

chunk_count.emplace_back(std::move(result));
}

size_t total_count = 0;

for(auto& result : chunk_count) {
total_count += co_await result;
}

std::cout << "there are " << total_count << " even numbers in the vector" << std::endl;
}

int main() {
concurrencpp::runtime runtime;
const auto vector = make_random_vector();
auto result = count_even(runtime.thread_pool_executor(), vector);
result.get();
return 0;
}
```
In this example, we again start the program by creating a runtime object. We create a vector filled with random numbers, then we acquire the `thread_pool_executor` from the runtime and call `count_even`. `count_even` is a coroutine (task) that spawns more coroutines and `co_await`s for them to finish inside. `max_concurrency_level` returns the maximum amount of workers that the executor supports, In the threadpool executor case, the number of worker is calculated from the number of cores. We then partitian the array to match the number of workers and send every chunk to be processed in its own task. Asynchronously, the workers count how many even numbers each chunk contains, and `co_return` the result. `count_even` iterates every result, pulling the asynchronous count by using `co_await`. The final result is then printed and `count_even` finishes. `main` which was blocked by calling `get` is unblocked and the program terminates gracefully.
**Executors**
In the context of concurrencpp, an executor is an object that is able to schedule and run coroutines.
Expand Down Expand Up @@ -120,6 +208,17 @@ This executor is good for long running tasks, like objects that run a work loop,

* **inline executor** - mainly used to override the behavior of other executors. Enqueuing a task is equivalent to invoking it inline.

***Using executors:***

The bare mechanism of an executor is encapsulated in its `enqueue` method. This method enqueues a suspended coroutine for execution and has two flavors: one flavor that receives a single `coroutine_handle<>` as an argument, and another that receives a `span<coroutine_handle<>>`. The second flavor is used to enqueue a batch of suspended coroutines. This allows better scheduling heuristics and decreased contention.

Of course, Applications don't need to use these low-level methods by themselves. `concurrencpp::executor` provides an API for scheduling non-coroutines by converting them to a suspended coroutine first and then scheduling them to run.
Applications can request executors to return a result object that marshals the asynchronous result of the provided callable. This is done by calling `executor::submit` and `execuor::bulk_submit`.
`submit` gets a callable, and returns a result object. `executor::bulk_submit` gets a `span` of callables and returns a `vector`of result objects in a similar way `submit` works.
In many cases, applications are not interested in the asynchronous value or exception. In this case, applications can use `executor:::post` and `executor::bulk_post` to schedule a callable or a `span` of callables to execute, but also tells the task to drop any returned value or thrown exception. Not marshaling the asynchronous result is faster than marshaling, but then we have no way of knowing the status of the ongoing task.

`post`, `bulk_post`, `submit` and `bulk_submit` use `enqueue` behind the scenes for the underlying scheduling mechanism.

**Result objects**

Asynchronous values and exceptions can be consumed using the concurrencpp result objects.
Expand All @@ -129,7 +228,11 @@ In either case, this asynchronous result is marshaled to the consumer of the res
The result state therefore, very from `idle` (the asynchronous result or exception aren't ready yet) to `value` (the coroutine terminated by returning a valid value) to `exception` (the coroutine terminated by throwing an exception).
Result objects can be polled for their state, waited, resolved or awaited.
Awaiting a result object by using `co_await` (and by doing so, turning the current function into a coroutine as well) is the preferred way of consuming result objects.
concurrencpp also provide the `null_result` type. in this case, any returned value or thrown exception will be dropped.

Result objects are a move-only type, and as such, they cannot be used after their content was moved to another result object. In this case, the result object is considered to be "empty" and attempts to call any method other than `operator bool` and `operator = ` will throw.
After the asynchronous result has been pulled out of the result object (by calling `get`, `await` or `await_via`), the result object becomes empty. Emptiness can be tested with `operator bool`.

concurrencpp also provide the `null_result` type. This type can be used as a return type from a coroutine. In this case, any returned value or thrown exception will be dropped. It's logically equivalent of returning `void`.

`concurrencpp::result` API

Expand Down Expand Up @@ -243,7 +346,12 @@ class result{
};
```
**Parallel coroutines**
Using executors with OOP style is great. We can launch new tasks by posting or submitting them to an executor object. In some cases, such as in parallel algorithms, recursive algorithms and concurrent algorithms that use the fork-join model, this style can cause inconvenience.
Instead of returning strange result types like `result<result<type>>` and doing functional style `join`/`unwrap`, we can just use parallel coroutines:
In concurrencpp, a parallel coroutine is a function that:
1. Returns any of `concurrencpp::result<T>` / `concurrencpp::null_result` .
Expand All @@ -252,16 +360,58 @@ In concurrencpp, a parallel coroutine is a function that:
1. Contains any of `co_await` or `co_return` in its body.
In this case, the function is a parallel coroutine:
concurrencpp will start the coroutine suspended and immediately re-schedule it to run in the provided executor.
concurrencpp will start the function suspended and immediately re-schedule it to run in the provided executor.
`concurrencpp::executor_tag` is a dummy placeholder to tell the concurrencpp runtime that this function is not a regular function, it needs to start running inside the given executor.
Applications can then consume the result of the parallel coroutine by using the returned result object.
This technique is especially good for parallel algorithms that require recursion, or applications that use the fork-join concurrency model.
***Parallel coroutine example : parallel Fibonacci:***
```cpp
#include "concurrencpp.h"
#include <iostream>
using namespace concurrencpp;
int fibbonacci_sync(int i) {
if (i == 0) {
return 0;
}
if (i == 1) {
return 1;
}
return fibbonacci_sync(i - 1) + fibbonacci_sync(i - 2);
}
result<int> fibbonacci(executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
if (curr <= 10) {
co_return fibbonacci_sync(curr);
}
auto fib_1 = fibbonacci({}, tpe, curr - 1);
auto fib_2 = fibbonacci({}, tpe, curr - 2);
co_return co_await fib_1 + co_await fib_2;
}
int main() {
concurrencpp::runtime runtime;
auto fibb_30 = fibonacci({}, runtime.thread_pool_executor(), 30).get();
std::cout << "fibonacci(30) = " << fibb_30 << std::endl;
return 0;
}
```
In this example, we calculate the 30-th member of the Fibonacci sequence in a parallel manner.
We start launching each Fibonacci step in it's own parallel coroutine. the first argument is a dummy `executor_tag` and the second argument is a threadpool executor.
Every recursive step invokes a new parallel coroutine that runs in parallel. the result is returned to the parent task and is acquired by using `co_await`.
When we deem the input to be small enough to be calculated synchronously (when `curr <= 10`), we stop executing each recursive step in its own task and just solve the algorithm synchronously.

**Creating and scheduling coroutines**

There are few ways to create and schedule coroutines in concurrencpp

- By calling any of `executor::post`, `executor::submit`, `executor::bulk_post` or `executor::bulk_submit`.
In this case the application provides a callable and depending on the selected method, might return a result object to the caller. The callable is then turned into a coroutine and scheduled to run in the executor.
In this case the application provides a callable and depending Aon the selected method, might return a result object to the caller. The callable is then turned into a coroutine and scheduled to run in the executor.

- By using a parallel coroutine.

Expand Down Expand Up @@ -342,9 +492,9 @@ class timer_queue {
Creates a new delay object where *this is associated timer_queue
*/
result<void> make_delay_object(size_t due_time, std::shared_ptr<concurrencpp::executor> executor);
};
};

class timer {
class timer {
/*
Creates an empty timer that does not run.
*/
Expand Down Expand Up @@ -416,7 +566,8 @@ class timer_queue {
The concurrencpp runtime object is the glue that sticks all the components above to a complete and cohesive mechanism of managing asynchronous actions.
The concurrencpp runtime object is the agent used to acquire, store and create new executors. The runtime must be created as a value type as soon as the main function starts to run.
When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling `executor::shutdown`. Executors then exit their inner work loop and any subsequent attempt to schedule a new coroutine will throw a `concurrencpp::executor_shutdown` exception. The runtime also contains the global timer queue used to create timers and delay objects.
When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling `executor::shutdown`. Executors then exit their inner work loop and any subsequent attempt to schedule a new task will throw a `concurrencpp::executor_shutdown` exception. The runtime also contains the global timer queue used to create timers and delay objects. Upon destruction, stored executors will wait for ongoing tasks to finish. If an ongoing task tries to use an executor to spawn new tasks or schedule its continuation - an exception will be thrown. In this case, ongoing tasks needs to quit as soon as possible, allowing their underlying executors to quit. With this RAII style of code, no tasks can be processed before the creation of the runtime object, and while/after the runtime gets out of scope.
This frees concurrent applications from needing to communicate termination message explicitly. Tasks are free use executors as long as the runtime object is alive.
`concurrencpp::runtime` API
Expand Down
1 change: 1 addition & 0 deletions concurrencpp/src/executors/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace concurrencpp::details::consts {
inline const char* k_thread_pool_executor_name = "concurrencpp::thread_pool_executor";
inline const char* k_background_executor_name = "concurrencpp::background_executor";

constexpr int k_worker_thread_max_concurrency_level = 1;
inline const char* k_worker_thread_executor_name = "concurrencpp::worker_thread_executor";

inline const char* k_manual_executor_name = "concurrencpp::manual_executor";
Expand Down
14 changes: 11 additions & 3 deletions concurrencpp/src/executors/manual_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@

using concurrencpp::manual_executor;

void manual_executor::destroy_tasks() noexcept {
void manual_executor::destroy_tasks(std::unique_lock<std::mutex>& lock) noexcept {
assert(lock.owns_lock());

for (auto task : m_tasks) {
task.destroy();
}

m_tasks.clear();
}

void manual_executor::enqueue(std::experimental::coroutine_handle<> task) {
Expand Down Expand Up @@ -146,13 +150,17 @@ bool manual_executor::wait_for_task(std::chrono::milliseconds max_waiting_time)
}

void manual_executor::shutdown() noexcept {
m_atomic_abort.store(true, std::memory_order_relaxed);
const auto abort = m_atomic_abort.exchange(true, std::memory_order_relaxed);
if (abort) {
//shutdown had been called before.
return;
}

{
std::unique_lock<decltype(m_lock)> lock(m_lock);
m_abort = true;

destroy_tasks();
destroy_tasks(lock);
}

m_condition.notify_all();
Expand Down
2 changes: 1 addition & 1 deletion concurrencpp/src/executors/manual_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace concurrencpp {
bool m_abort;
std::atomic_bool m_atomic_abort;

void destroy_tasks() noexcept;
void destroy_tasks(std::unique_lock<std::mutex>& lock) noexcept;

public:
manual_executor() :
Expand Down
22 changes: 10 additions & 12 deletions concurrencpp/src/executors/thread_pool_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,20 +268,18 @@ void thread_pool_worker::work_loop() noexcept {
}

void thread_pool_worker::destroy_tasks() noexcept {
std::unique_lock<std::mutex> lock(m_lock);
for (auto task : m_private_queue) {
task.destroy();
}

m_private_queue.clear();

{
std::unique_lock<std::mutex> lock(m_lock);
m_private_queue = std::move(m_public_queue);
}

for (auto task : m_private_queue) {
for (auto task : m_public_queue) {
task.destroy();
}

m_public_queue.clear();
}

void thread_pool_worker::ensure_worker_active(std::unique_lock<std::mutex>& lock) {
Expand Down Expand Up @@ -353,6 +351,7 @@ void thread_pool_worker::enqueue_local(std::span<std::experimental::coroutine_ha
}

void thread_pool_worker::abort() noexcept {
assert(m_atomic_abort.load(std::memory_order_relaxed) == false);
m_atomic_abort.store(true, std::memory_order_relaxed);

{
Expand Down Expand Up @@ -457,15 +456,14 @@ bool thread_pool_executor::shutdown_requested() const noexcept {
}

void concurrencpp::thread_pool_executor::shutdown() noexcept {
m_abort.store(true, std::memory_order_relaxed);

for (auto& worker : m_workers) {
worker.abort();
const auto abort = m_abort.exchange(true, std::memory_order_relaxed);
if (abort) {
//shutdown had been called before.
return;
}

std::this_thread::yield();

for (auto& worker : m_workers) {
worker.abort();
worker.join();
}
}
20 changes: 12 additions & 8 deletions concurrencpp/src/executors/worker_thread_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,18 @@ void worker_thread_executor::join() {
}

void worker_thread_executor::destroy_tasks() noexcept {
std::unique_lock<std::mutex> lock(m_lock);
for (auto task : m_private_queue) {
task.destroy();
}

m_private_queue.clear();

{
std::unique_lock<std::mutex> lock(m_lock);
m_private_queue = std::move(m_public_queue);
}

for (auto task : m_private_queue) {
for (auto task : m_public_queue) {
task.destroy();
}

m_public_queue.clear();
}

bool worker_thread_executor::drain_queue_impl() {
Expand Down Expand Up @@ -153,16 +151,22 @@ void worker_thread_executor::enqueue(std::span<std::experimental::coroutine_hand
}

int worker_thread_executor::max_concurrency_level() const noexcept {
return 1;
return details::consts::k_worker_thread_max_concurrency_level;
}

bool concurrencpp::worker_thread_executor::shutdown_requested() const noexcept {
return m_atomic_abort.load(std::memory_order_relaxed);
}

void worker_thread_executor::shutdown() noexcept {
const auto abort = m_atomic_abort.exchange(true, std::memory_order_relaxed);
if (abort) {
//shutdown had been called before.
return;
}

assert(m_private_atomic_abort.load(std::memory_order_relaxed) == false);
m_private_atomic_abort.store(true, std::memory_order_relaxed);
m_atomic_abort.store(true, std::memory_order_relaxed);

join();
destroy_tasks();
Expand Down
Loading

0 comments on commit d98c5ef

Please sign in to comment.