Skip to content

Commit

Permalink
v.0.0.6
Browse files Browse the repository at this point in the history
* corotuine-promises refactured
* addition of derivable_executor
* remove result_core_base::reset_consumer
* supprot for timer_queue shutdown
* support await_via, resolve_via, make_exceptional_result null reference exception
* test helpers refactoring
* result::await(_via)/result::resolve(_via) tests refactoring
* result::when_any tests, coroutine-tests improvements
  • Loading branch information
David-Haim committed Sep 26, 2020
1 parent 1dc07c0 commit 3bf27dc
Show file tree
Hide file tree
Showing 61 changed files with 1,780 additions and 3,229 deletions.
57 changes: 45 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# concurrencpp, the C++ concurrency library

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Expand Down Expand Up @@ -247,6 +246,8 @@ This executor is good for long running tasks, like objects that run a work loop,

* **manual executor** - an executor that does not execute coroutines by itself. Application code can execute previously enqueued tasks by manually invoking its execution methods.

* **derivable executor** - a base class for user defined executors. Although inheriting directly from `concurrencpp::executor` is possible, `derivable_executor` uses the `CRTP` pattern that provides some optimization opportunities for the compiler.

* **inline executor** - mainly used to override the behavior of other executors. Enqueuing a task is equivalent to invoking it inline.

#### Using executors
Expand Down Expand Up @@ -358,6 +359,7 @@ class result{
In either way, after resuming, if the result is a valid value, it is returned.
Otherwise, operator co_await rethrows the asynchronous exception.
Throws concurrencpp::errors::empty_result if *this is empty.
Throws std::invalid_argument if executor is null.
If this result is ready and force_rescheduling=true, throws any exception that executor::enqueue may throw.
*/
auto await_via(
Expand All @@ -380,6 +382,7 @@ class result{
if force_rescheduling = false, then the current coroutine resumes immediately in the calling thread of execution.
In either way, after resuming, *this is returned in a non-empty form and guaranteed that its status is not result_status::idle.
Throws concurrencpp::errors::empty_result if *this is empty.
Throws std::invalid_argument if executor is null.
If this result is ready and force_rescheduling=true, throws any exception that executor::enqueue may throw.
*/
auto resolve_via(
Expand Down Expand Up @@ -574,7 +577,8 @@ result<void> make_ready_result();

/*
Creates a ready result object from an exception pointer.
The result object will re-throw exception_ptr when calling get, await or await_via
The result object will re-throw exception_ptr when calling get, await or await_via.
Throws std::invalid_argument if exception_ptr is null.
*/
template<class type>
result<type> make_exceptional_result(std::exception_ptr exception_ptr);
Expand Down Expand Up @@ -642,8 +646,7 @@ when_any(iterator_type begin, iterator_type end);
### Timers and Timer queues
concurrencpp also provides timers and timer queues.
Timers are objects that schedule actions to run on an executor within a well-defined interval of time. There are three types of timers - *regular timers*, *onshot-timers* and *delay objects*.
A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution. In order to create timers, one must use the timer queue in conjunction with an executor.
Timers are objects that define actions which run on an executor within a well-defined interval of time. There are three types of timers - *regular timers*, *onshot-timers* and *delay objects*.
Timers have four properties that describe them:
Expand All @@ -652,16 +655,38 @@ Timers have four properties that describe them:
1. Due time - from the time of creation, the interval in milliseconds in which the timer will be scheduled to run for the first time
1. Frequency - from the time the timer callable was scheduled for the first time, the interval in milliseconds the callable will be schedule to run periodically, until the timer is destructed or cancelled.
A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution.
When a timer deadline (whether its due-time or frequency) has reached, the timer queue "fires" the timer by scheduling its callable to run on the timer given executor.
Just like executors, timer queues also adhere to the RAII concpet. When the runtime object gets out of scope, It shuts down the timer queue, cancelling all pending timers.
After a timer queue has been shut down, any subsequent call to `make_timer`, `make_onshot_timer` and `make_delay_object` will throw an `errors::timer_queue_shutdown` exceptions.
Applications must not try to shut down timer queues by themselves.
#### `timer_queue` API:
```cpp
class timer_queue {
/*
Destroyes *this and cancels all associated timers.
*/
~timer_queue() noexcept;
/*
Shuts down this timer_queue.
After this call, invocation of any method besides shutdown
and shutdown_requested will throw an errors::timer_queue_shutdown.
If shutdown had been called before, this method has no effect.
*/
void shutdown() noexcept;
/*
Returns true if shutdown had been called before, false otherwise.
*/
bool shutdown_requested() const noexcept;
/*
Creates a new running timer where *this is the associated timer_queue
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type>
timer make_timer(
Expand All @@ -671,7 +696,9 @@ class timer_queue {
callable_type&& callable);
/*
Creates a new running timer where *this is associated timer_queue
Creates a new running timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type, class ... argumet_types>
timer make_timer(
Expand All @@ -682,7 +709,9 @@ class timer_queue {
argumet_types&& ... arguments);
/*
Creates a new one shot timer where *this is associated timer_queue
Creates a new one shot timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type>
timer make_one_shot_timer(
Expand All @@ -691,7 +720,9 @@ class timer_queue {
callable_type&& callable);
/*
Creates a new one shot timer where *this is associated timer_queue
Creates a new one shot timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type, class ... argumet_types>
timer make_one_shot_timer(
Expand All @@ -701,7 +732,9 @@ class timer_queue {
argumet_types&& ... arguments);
/*
Creates a new delay object where *this is associated timer_queue
Creates a new delay object where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
result<void> make_delay_object(size_t due_time, std::shared_ptr<concurrencpp::executor> executor);
};
Expand Down Expand Up @@ -937,7 +970,7 @@ class runtime {

#### Creating user-defined executors

As mentioned before, Applications can create their own custom executor type by implementing the `executor` interface. There are a few points to consider when implementing user defined executors:
As mentioned before, Applications can create their own custom executor type by inheriting the `derivable_executor` class. There are a few points to consider when implementing user defined executors:
The most important thing is to remember that executors are used from multiple threads, so implemented methods must be thread-safe.
Another important thing is to handle shutdown correctly: `shutdown`, `shutdown_requested` and `enqueue` should all monitor the executor state and behave accordingly when invoked:
* `shutdown` should tell underlying threads to quit and then join them. `shutdown` must also destroy each unexecuted `coroutine_handle` by calling `coroutine_handle::destroy`.
Expand All @@ -956,7 +989,7 @@ Another important thing is to handle shutdown correctly: `shutdown`, `shutdown_r
#include <mutex>
#include <condition_variable>

class logging_executor : public concurrencpp::executor {
class logging_executor : public concurrencpp::derivable_executor<logging_executor> {

private:
mutable std::mutex _lock;
Expand Down Expand Up @@ -990,7 +1023,7 @@ private:

public:
logging_executor(std::string_view prefix) :
executor("logging_executor"),
derivable_executor<logging_executor>("logging_executor"),
_shutdown_requested(false),
_prefix(prefix) {
_thread = std::thread([this] {
Expand Down
4 changes: 3 additions & 1 deletion concurrencpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ project(concurrencpp)

add_library(
concurrencpp
src/executors/executor.cpp

src/runtime/runtime.cpp
src/threads/thread.cpp
src/results/promises.cpp
src/results/result_core.cpp
src/executors/executor.cpp
src/executors/executor.cpp
src/executors/manual_executor.cpp
src/executors/thread_executor.cpp
src/executors/thread_pool_executor.cpp
Expand Down
5 changes: 4 additions & 1 deletion concurrencpp/src/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <stdexcept>

namespace concurrencpp::errors {

struct empty_object : public std::runtime_error {
empty_object(const std::string& message) : runtime_error(message) {}
};
Expand Down Expand Up @@ -37,6 +36,10 @@ namespace concurrencpp::errors {
struct executor_shutdown : public std::runtime_error {
executor_shutdown(const std::string& message) : runtime_error(message) {}
};

struct timer_queue_shutdown : public std::runtime_error {
timer_queue_shutdown(const std::string& message) : runtime_error(message) {}
};
}

#endif //ERRORS_H
40 changes: 40 additions & 0 deletions concurrencpp/src/executors/derivable_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#ifndef CONCURRENCPP_DERIVABLE_EXECUTOR_H
#define CONCURRENCPP_DERIVABLE_EXECUTOR_H

#include "executor.h"

namespace concurrencpp {
template<class concrete_executor_type>
class derivable_executor : public executor {

private:
concrete_executor_type* self() noexcept {
return static_cast<concrete_executor_type*>(this);
}

public:
derivable_executor(std::string_view name) : executor(name) {}

template<class callable_type, class ... argument_types>
void post(callable_type&& callable, argument_types&& ... arguments) {
return do_post(self(), std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}

template<class callable_type, class ... argument_types>
auto submit(callable_type&& callable, argument_types&& ... arguments) {
return do_submit(self(), std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}

template<class callable_type>
void bulk_post(std::span<callable_type> callable_list) {
return do_bulk_post(self(), callable_list);
}

template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
std::vector<concurrencpp::result<return_type>> bulk_submit(std::span<callable_type> callable_list) {
return do_bulk_submit(self(), callable_list);
}
};
}

#endif
92 changes: 57 additions & 35 deletions concurrencpp/src/executors/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ namespace concurrencpp {
class executor {

private:
template<class callable_type, class ... argument_types>
template<class executor_type, class callable_type, class ... argument_types>
static null_result post_bridge(
executor_tag,
executor*,
executor_type*,
callable_type callable,
argument_types... arguments) {
callable(arguments...);
Expand All @@ -35,10 +35,10 @@ namespace concurrencpp {
co_return;
}

template<class return_type, class callable_type, class ... argument_types>
template<class return_type, class executor_type, class callable_type, class ... argument_types>
static result<return_type> submit_bridge(
executor_tag,
executor*,
executor_type*,
callable_type callable,
argument_types... arguments) {
co_return callable(arguments...);
Expand All @@ -52,51 +52,37 @@ namespace concurrencpp {
co_return callable();
}

public:
executor(std::string_view name) : name(name) {}

virtual ~executor() noexcept = default;

const std::string name;

virtual void enqueue(std::experimental::coroutine_handle<> task) = 0;
virtual void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) = 0;

virtual int max_concurrency_level() const noexcept = 0;

virtual bool shutdown_requested() const noexcept = 0;
virtual void shutdown() noexcept = 0;

template<class callable_type, class ... argument_types>
void post(callable_type&& callable, argument_types&& ... arguments) {
protected:
template<class executor_type, class callable_type, class ... argument_types>
static void do_post(executor_type* executor_ptr, callable_type&& callable, argument_types&& ... arguments) {
static_assert(
std::is_invocable_v<callable_type, argument_types...>,
"concurrencpp::executor::post - <<callable_type>> is not invocable with <<argument_types...>>");
"concurrencpp::executor::post - <<callable_type>> is not invokable with <<argument_types...>>");

post_bridge(
executor_tag{},
this,
{},
executor_ptr,
std::forward<callable_type>(callable),
std::forward<argument_types>(arguments)...);
}

template<class callable_type, class ... argument_types>
auto submit(callable_type&& callable, argument_types&& ... arguments) {
template<class executor_type, class callable_type, class ... argument_types>
static auto do_submit(executor_type* executor_ptr, callable_type&& callable, argument_types&& ... arguments) {
static_assert(
std::is_invocable_v<callable_type, argument_types...>,
"concurrencpp::executor::submit - <<callable_type>> is not invocable with <<argument_types...>>");
"concurrencpp::executor::submit - <<callable_type>> is not invokable with <<argument_types...>>");

using return_type = typename std::invoke_result_t<callable_type, argument_types...>;

return submit_bridge<return_type>(
executor_tag{},
this,
{},
executor_ptr,
std::forward<callable_type>(callable),
std::forward<argument_types>(arguments)...);
}

template<class callable_type>
void bulk_post(std::span<callable_type> callable_list) {
template<class executor_type, class callable_type>
static void do_bulk_post(executor_type* executor_ptr, std::span<callable_type> callable_list) {
std::vector<std::experimental::coroutine_handle<>> accumulator;
accumulator.reserve(callable_list.size());

Expand All @@ -105,11 +91,12 @@ namespace concurrencpp {
}

assert(!accumulator.empty());
enqueue(accumulator);
executor_ptr->enqueue(accumulator);
}

template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
std::vector<concurrencpp::result<return_type>> bulk_submit(std::span<callable_type> callable_list) {
template<class executor_type, class callable_type, class return_type = std::invoke_result_t<callable_type>>
static std::vector<concurrencpp::result<return_type>>
do_bulk_submit(executor_type* executor_ptr, std::span<callable_type> callable_list) {
std::vector<std::experimental::coroutine_handle<>> accumulator;
accumulator.reserve(callable_list.size());

Expand All @@ -121,9 +108,44 @@ namespace concurrencpp {
}

assert(!accumulator.empty());
enqueue(accumulator);
executor_ptr->enqueue(accumulator);
return results;
}

public:
executor(std::string_view name) : name(name) {}

virtual ~executor() noexcept = default;

const std::string name;

virtual void enqueue(std::experimental::coroutine_handle<> task) = 0;
virtual void enqueue(std::span<std::experimental::coroutine_handle<>> tasks) = 0;

virtual int max_concurrency_level() const noexcept = 0;

virtual bool shutdown_requested() const noexcept = 0;
virtual void shutdown() noexcept = 0;

template<class callable_type, class ... argument_types>
void post(callable_type&& callable, argument_types&& ... arguments) {
return do_post(this, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}

template<class callable_type, class ... argument_types>
auto submit(callable_type&& callable, argument_types&& ... arguments) {
return do_submit(this, std::forward<callable_type>(callable), std::forward<argument_types>(arguments)...);
}

template<class callable_type>
void bulk_post(std::span<callable_type> callable_list) {
return do_bulk_post(this, callable_list);
}

template<class callable_type, class return_type = std::invoke_result_t<callable_type>>
std::vector<concurrencpp::result<return_type>> bulk_submit(std::span<callable_type> callable_list) {
return do_bulk_submit(this, callable_list);
}
};
}

Expand Down
1 change: 1 addition & 0 deletions concurrencpp/src/executors/executor_all.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CONCURRENCPP_EXECUTORS_ALL_H
#define CONCURRENCPP_EXECUTORS_ALL_H

#include "derivable_executor.h"
#include "inline_executor.h"
#include "thread_pool_executor.h"
#include "thread_executor.h"
Expand Down
Loading

0 comments on commit 3bf27dc

Please sign in to comment.