Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v.0.0.7 #11

Merged
merged 1 commit into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
v.0.0.7
* when_all safer implementation (I suspect a bug in Clang codegen for coroutines)
* result_core::publish_result doesn't move the consumer context
* timer queue only accepts callables + args. bind returns the same callable if no arguments were given
* timer uses std::chrono::milliseconds instead of size_t
* error messages style unification
  • Loading branch information
David-Haim committed Sep 28, 2020
commit ec109fd5906e820977fd7d4b8b514aaacf35f311
61 changes: 22 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# concurrencpp, the C++ concurrency library

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Expand Down Expand Up @@ -682,51 +683,27 @@ class timer_queue {
*/
bool shutdown_requested() const noexcept;


/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type>
timer make_timer(
size_t due_time,
size_t frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable);

/*
Creates a new running timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type, class ... argumet_types>
timer make_timer(
size_t due_time,
size_t frequency,
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);

/*
Creates a new one shot timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type>
timer make_one_shot_timer(
size_t due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable);

/*
Creates a new one shot timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type, class ... argumet_types>
timer make_one_shot_timer(
size_t due_time,
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
Expand All @@ -736,7 +713,7 @@ class timer_queue {
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
result<void> make_delay_object(size_t due_time, std::shared_ptr<concurrencpp::executor> executor);
result<void> make_delay_object(std::chrono::milliseconds due_time, std::shared_ptr<concurrencpp::executor> executor);
};
```

Expand Down Expand Up @@ -778,7 +755,7 @@ class timer {
Returns the due time in milliseconds the timer was defined with.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
size_t get_due_time() const;
std::chrono::milliseconds get_due_time() const;

/*
Returns the executor the timer was defined with.
Expand All @@ -796,13 +773,13 @@ class timer {
Returns the frequency in milliseconds the timer was defined with.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
size_t get_frequency() const;
std::chrono::milliseconds get_frequency() const;

/*
Sets new frequency for this timer. Callables that have been scheduled to run at this point are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency(size_t new_frequency);
void set_frequency(std::chrono::milliseconds new_frequency);

/*
Returns true is *this is not an empty timer, false otherwise.
Expand All @@ -817,19 +794,21 @@ class timer {

#include <iostream>

using namespace std::chrono_literals;

int main() {
concurrencpp::runtime runtime;
std::atomic_size_t counter = 1;
concurrencpp::timer timer = runtime.timer_queue()->make_timer(
1'500,
2'000,
1'500ms,
2'000ms,
runtime.thread_pool_executor(),
[&] {
const auto c = counter.fetch_add(1);
std::cout << "timer was invoked for the " << c << "th time" << std::endl;
});

std::this_thread::sleep_for(std::chrono::seconds(12));
std::this_thread::sleep_for(12s);
return 0;
}
```
Expand All @@ -845,16 +824,18 @@ A oneshot timer is a one-time timer with only a due time - after it schedules it

#include <iostream>

using namespace std::chrono_literals;

int main() {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime.timer_queue()->make_one_shot_timer(
3'000,
3'000ms,
runtime.thread_executor(),
[&] {
std::cout << "hello and goodbye" << std::endl;
});

std::this_thread::sleep_for(std::chrono::seconds(4));
std::this_thread::sleep_for(4s);
return 0;
}
```
Expand All @@ -870,6 +851,8 @@ A delay object is a result object that becomes ready when its due time is reache

#include <iostream>

using namespace std::chrono_literals;

concurrencpp::null_result delayed_task(
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
Expand All @@ -879,15 +862,15 @@ concurrencpp::null_result delayed_task(
std::cout << "task was invoked " << counter << " times." << std::endl;
counter++;

co_await tq->make_delay_object(1'500, ex);
co_await tq->make_delay_object(1'500ms, ex);
}
}

int main() {
concurrencpp::runtime runtime;
delayed_task(runtime.timer_queue(), runtime.thread_pool_executor());

std::this_thread::sleep_for(std::chrono::seconds(10));
std::this_thread::sleep_for(10s);
return 0;
}
```
Expand Down Expand Up @@ -1107,5 +1090,5 @@ In this example, we created an executor which logs actions like enqueuing a task

### Supported platforms
* Linux (requires clang-9 and above).
* machOS (requires clang-9 and above).
* macOS (requires clang-9 and above).
* Windows (requires Windows 10, visual studio 2019 and above).
12 changes: 6 additions & 6 deletions concurrencpp/src/results/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,27 @@ namespace concurrencpp::details::consts {
"result::resolve_via() - given executor is null.";

inline const char* k_result_awaitable_error_msg =
"concurrencpp::awaitable_type<type>::await_suspend - awaitable is empty.";
"concurrencpp::awaitable_type<type>::await_suspend() - awaitable is empty.";

inline const char* k_result_resolver_error_msg =
"result_resolver<type>::await_suspend - awaitable is empty.";
"result_resolver<type>::await_suspend() - awaitable is empty.";

inline const char* k_executor_exception_error_msg =
"concurrencpp::result - an exception was thrown while trying to enqueue result continuation.";


inline const char* k_make_exceptional_result_exception_null_error_msg =
"make_exception_result - given exception_ptr is null.";
"make_exception_result() - given exception_ptr is null.";


inline const char* k_when_all_empty_result_error_msg =
"concurrencpp::when_all - one of the result objects is empty.";
"concurrencpp::when_all() - one of the result objects is empty.";

inline const char* k_when_any_empty_result_error_msg =
"concurrencpp::when_any - one of the result objects is empty.";
"concurrencpp::when_any() - one of the result objects is empty.";

inline const char* k_when_any_empty_range_error_msg =
"concurrencpp::when_any - given range contains no elements.";
"concurrencpp::when_any() - given range contains no elements.";
}

#endif
13 changes: 4 additions & 9 deletions concurrencpp/src/results/result_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ bool result_core_base::await_via(
return handle_done_state(await_ctx, force_rescheduling);
}

void result_core_base::when_all(std::weak_ptr<when_all_state_base> when_all_state) noexcept {
void result_core_base::when_all(std::shared_ptr<when_all_state_base> when_all_state) noexcept {
const auto state = m_pc_state.load(std::memory_order_acquire);
if (state == pc_state::producer) {
assert(!when_all_state.expired());
auto state = when_all_state.lock();
return state->on_result_ready();
return when_all_state->on_result_ready();
}

assert_consumer_idle();
Expand All @@ -138,11 +136,8 @@ void result_core_base::when_all(std::weak_ptr<when_all_state_base> when_all_stat
}

assert_done();
auto state_ptr = std::get<4>(m_consumer).lock();

if (static_cast<bool>(state_ptr)) {
state_ptr->on_result_ready();
}
auto& state_ptr = std::get<4>(m_consumer);
state_ptr->on_result_ready();
}

concurrencpp::details::when_any_status result_core_base::when_any(
Expand Down
19 changes: 8 additions & 11 deletions concurrencpp/src/results/result_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ namespace concurrencpp::details {
std::experimental::coroutine_handle<>,
await_context,
std::shared_ptr<wait_context>,
std::weak_ptr<when_all_state_base>,
std::shared_ptr<when_all_state_base>,
when_any_ctx>;

enum class pc_state {
Expand Down Expand Up @@ -185,7 +185,7 @@ namespace concurrencpp::details {
std::experimental::coroutine_handle<> caller_handle,
bool force_rescheduling);

void when_all(std::weak_ptr<when_all_state_base> when_all_state) noexcept;
void when_all(std::shared_ptr<when_all_state_base> when_all_state) noexcept;

when_any_status when_any(std::shared_ptr<when_any_state_base> when_any_state, size_t index) noexcept;

Expand Down Expand Up @@ -369,27 +369,24 @@ namespace concurrencpp::details {
}

case 2: {
auto await_ctx = std::move(std::get<2>(this->m_consumer));
return this->schedule_continuation(await_ctx);
return this->schedule_continuation(std::get<2>(this->m_consumer));
}

case 3: {
const auto wait_ctx = std::move(std::get<3>(this->m_consumer));
auto& wait_ctx = std::get<3>(this->m_consumer);
wait_ctx->notify();
return;
}

case 4: {
auto when_all_state_weak = std::move(std::get<4>(this->m_consumer));
auto when_all_state = when_all_state_weak.lock();
if (static_cast<bool>(when_all_state)) {
when_all_state->on_result_ready();
}
auto& when_all_state = std::get<4>(this->m_consumer);
assert(static_cast<bool>(when_all_state));
when_all_state->on_result_ready();
return;
}

case 5: {
auto when_any_ctx = std::move(std::get<5>(this->m_consumer));
auto& when_any_ctx = std::get<5>(this->m_consumer);
auto& when_any_state = when_any_ctx.first;
assert(static_cast<bool>(when_any_state));
when_any_state->on_result_ready(when_any_ctx.second);
Expand Down
34 changes: 16 additions & 18 deletions concurrencpp/src/results/when_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,18 @@ namespace concurrencpp::details {

private:
tuple_type m_tuple;
result_core<tuple_type> m_core_ptr;
std::shared_ptr<result_core<tuple_type>> m_core_ptr;

template<class type>
void set_state(result<type>& result) noexcept {
auto core_ptr = when_result_helper::get_core(result);
core_ptr->when_all(this->weak_from_this());
core_ptr->when_all(this->shared_from_this());
}

public:
when_all_tuple_state(result_types&& ... results) noexcept :
m_tuple(std::forward<result_types>(results)...) {
m_tuple(std::forward<result_types>(results)...),
m_core_ptr(std::make_shared<result_core<tuple_type>>()) {
m_counter = sizeof ... (result_types);
}

Expand All @@ -88,14 +89,12 @@ namespace concurrencpp::details {
return;
}

m_core_ptr.set_result(std::move(m_tuple));
m_core_ptr.publish_result();
m_core_ptr->set_result(std::move(m_tuple));
m_core_ptr->publish_result();
}

result<tuple_type> get_result() noexcept {
return {
std::shared_ptr<result_core<tuple_type>>(this->shared_from_this(), &m_core_ptr)
};
return { m_core_ptr };
}
};

Expand All @@ -106,23 +105,24 @@ namespace concurrencpp::details {

private:
std::vector<type> m_vector;
result_core<std::vector<type>> m_core_ptr;
std::shared_ptr<result_core<std::vector<type>>> m_core_ptr;

template<class given_type>
void set_state(result<given_type>& result) noexcept {
auto core_ptr = when_result_helper::get_core(result);;
core_ptr->when_all(this->weak_from_this());
core_ptr->when_all(this->shared_from_this());
}

public:
template<class iterator_type>
when_all_vector_state(iterator_type begin, iterator_type end) :
m_vector(std::make_move_iterator(begin), std::make_move_iterator(end)) {
m_vector(std::make_move_iterator(begin), std::make_move_iterator(end)),
m_core_ptr(std::make_shared<result_core<std::vector<type>>>()) {
m_counter = m_vector.size();
}

void set_state() noexcept {
for(auto& result : m_vector) {
for (auto& result : m_vector) {
set_state(result);
}
}
Expand All @@ -132,14 +132,12 @@ namespace concurrencpp::details {
return;
}

m_core_ptr.set_result(std::move(m_vector));
m_core_ptr.publish_result();
m_core_ptr->set_result(std::move(m_vector));
m_core_ptr->publish_result();
}

result<std::vector<type>> get_result() noexcept {
return {
std::shared_ptr<result_core<std::vector<type>>>(this->shared_from_this(), &m_core_ptr)
};
return { m_core_ptr };
}
};
}
Expand Down Expand Up @@ -381,7 +379,7 @@ namespace concurrencpp {

template<class iterator_type>
result<when_any_result<std::vector<typename std::iterator_traits<iterator_type>::value_type>>>
when_any(iterator_type begin, iterator_type end) {
when_any(iterator_type begin, iterator_type end) {
details::when_result_helper::throw_if_empty_range(
details::consts::k_when_any_empty_result_error_msg,
begin,
Expand Down
Loading