Skip to content

Commit

Permalink
v.0.0.7 (David-Haim#11)
Browse files Browse the repository at this point in the history
* when_all safer implementation (I suspect a bug in Clang codegen for coroutines)
* result_core::publish_result doesn't move the consumer context
* timer queue only accepts callables + args. bind returns the same callable if no arguments were given
* timer uses std::chrono::milliseconds instead of size_t
* error messages style unification
  • Loading branch information
David-Haim authored Sep 28, 2020
1 parent 0fad684 commit fbc8c47
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 332 deletions.
61 changes: 22 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# concurrencpp, the C++ concurrency library

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
Expand Down Expand Up @@ -682,51 +683,27 @@ class timer_queue {
*/
bool shutdown_requested() const noexcept;
/*
Creates a new running timer where *this is the associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type>
timer make_timer(
size_t due_time,
size_t frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable);
/*
Creates a new running timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type, class ... argumet_types>
timer make_timer(
size_t due_time,
size_t frequency,
std::chrono::milliseconds due_time,
std::chrono::milliseconds frequency,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
/*
Creates a new one shot timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type>
timer make_one_shot_timer(
size_t due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable);
/*
Creates a new one shot timer where *this is associated timer_queue.
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
template<class callable_type, class ... argumet_types>
timer make_one_shot_timer(
size_t due_time,
std::chrono::milliseconds due_time,
std::shared_ptr<concurrencpp::executor> executor,
callable_type&& callable,
argumet_types&& ... arguments);
Expand All @@ -736,7 +713,7 @@ class timer_queue {
Throws std::invalid_argument if executor is null.
Throws errors::timer_queue_shutdown if shutdown had been called before.
*/
result<void> make_delay_object(size_t due_time, std::shared_ptr<concurrencpp::executor> executor);
result<void> make_delay_object(std::chrono::milliseconds due_time, std::shared_ptr<concurrencpp::executor> executor);
};
```

Expand Down Expand Up @@ -778,7 +755,7 @@ class timer {
Returns the due time in milliseconds the timer was defined with.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
size_t get_due_time() const;
std::chrono::milliseconds get_due_time() const;

/*
Returns the executor the timer was defined with.
Expand All @@ -796,13 +773,13 @@ class timer {
Returns the frequency in milliseconds the timer was defined with.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
size_t get_frequency() const;
std::chrono::milliseconds get_frequency() const;

/*
Sets new frequency for this timer. Callables that have been scheduled to run at this point are not affected.
Throws concurrencpp::errors::empty_timer is *this is empty.
*/
void set_frequency(size_t new_frequency);
void set_frequency(std::chrono::milliseconds new_frequency);

/*
Returns true is *this is not an empty timer, false otherwise.
Expand All @@ -817,19 +794,21 @@ class timer {
#include <iostream>
using namespace std::chrono_literals;
int main() {
concurrencpp::runtime runtime;
std::atomic_size_t counter = 1;
concurrencpp::timer timer = runtime.timer_queue()->make_timer(
1'500,
2'000,
1'500ms,
2'000ms,
runtime.thread_pool_executor(),
[&] {
const auto c = counter.fetch_add(1);
std::cout << "timer was invoked for the " << c << "th time" << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(12));
std::this_thread::sleep_for(12s);
return 0;
}
```
Expand All @@ -845,16 +824,18 @@ A oneshot timer is a one-time timer with only a due time - after it schedules it

#include <iostream>

using namespace std::chrono_literals;

int main() {
concurrencpp::runtime runtime;
concurrencpp::timer timer = runtime.timer_queue()->make_one_shot_timer(
3'000,
3'000ms,
runtime.thread_executor(),
[&] {
std::cout << "hello and goodbye" << std::endl;
});

std::this_thread::sleep_for(std::chrono::seconds(4));
std::this_thread::sleep_for(4s);
return 0;
}
```
Expand All @@ -870,6 +851,8 @@ A delay object is a result object that becomes ready when its due time is reache
#include <iostream>
using namespace std::chrono_literals;
concurrencpp::null_result delayed_task(
std::shared_ptr<concurrencpp::timer_queue> tq,
std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
Expand All @@ -879,15 +862,15 @@ concurrencpp::null_result delayed_task(
std::cout << "task was invoked " << counter << " times." << std::endl;
counter++;
co_await tq->make_delay_object(1'500, ex);
co_await tq->make_delay_object(1'500ms, ex);
}
}
int main() {
concurrencpp::runtime runtime;
delayed_task(runtime.timer_queue(), runtime.thread_pool_executor());
std::this_thread::sleep_for(std::chrono::seconds(10));
std::this_thread::sleep_for(10s);
return 0;
}
```
Expand Down Expand Up @@ -1107,5 +1090,5 @@ In this example, we created an executor which logs actions like enqueuing a task

### Supported platforms
* Linux (requires clang-9 and above).
* machOS (requires clang-9 and above).
* macOS (requires clang-9 and above).
* Windows (requires Windows 10, visual studio 2019 and above).
12 changes: 6 additions & 6 deletions concurrencpp/src/results/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,27 @@ namespace concurrencpp::details::consts {
"result::resolve_via() - given executor is null.";

inline const char* k_result_awaitable_error_msg =
"concurrencpp::awaitable_type<type>::await_suspend - awaitable is empty.";
"concurrencpp::awaitable_type<type>::await_suspend() - awaitable is empty.";

inline const char* k_result_resolver_error_msg =
"result_resolver<type>::await_suspend - awaitable is empty.";
"result_resolver<type>::await_suspend() - awaitable is empty.";

inline const char* k_executor_exception_error_msg =
"concurrencpp::result - an exception was thrown while trying to enqueue result continuation.";


inline const char* k_make_exceptional_result_exception_null_error_msg =
"make_exception_result - given exception_ptr is null.";
"make_exception_result() - given exception_ptr is null.";


inline const char* k_when_all_empty_result_error_msg =
"concurrencpp::when_all - one of the result objects is empty.";
"concurrencpp::when_all() - one of the result objects is empty.";

inline const char* k_when_any_empty_result_error_msg =
"concurrencpp::when_any - one of the result objects is empty.";
"concurrencpp::when_any() - one of the result objects is empty.";

inline const char* k_when_any_empty_range_error_msg =
"concurrencpp::when_any - given range contains no elements.";
"concurrencpp::when_any() - given range contains no elements.";
}

#endif
13 changes: 4 additions & 9 deletions concurrencpp/src/results/result_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ bool result_core_base::await_via(
return handle_done_state(await_ctx, force_rescheduling);
}

void result_core_base::when_all(std::weak_ptr<when_all_state_base> when_all_state) noexcept {
void result_core_base::when_all(std::shared_ptr<when_all_state_base> when_all_state) noexcept {
const auto state = m_pc_state.load(std::memory_order_acquire);
if (state == pc_state::producer) {
assert(!when_all_state.expired());
auto state = when_all_state.lock();
return state->on_result_ready();
return when_all_state->on_result_ready();
}

assert_consumer_idle();
Expand All @@ -138,11 +136,8 @@ void result_core_base::when_all(std::weak_ptr<when_all_state_base> when_all_stat
}

assert_done();
auto state_ptr = std::get<4>(m_consumer).lock();

if (static_cast<bool>(state_ptr)) {
state_ptr->on_result_ready();
}
auto& state_ptr = std::get<4>(m_consumer);
state_ptr->on_result_ready();
}

concurrencpp::details::when_any_status result_core_base::when_any(
Expand Down
19 changes: 8 additions & 11 deletions concurrencpp/src/results/result_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ namespace concurrencpp::details {
std::experimental::coroutine_handle<>,
await_context,
std::shared_ptr<wait_context>,
std::weak_ptr<when_all_state_base>,
std::shared_ptr<when_all_state_base>,
when_any_ctx>;

enum class pc_state {
Expand Down Expand Up @@ -185,7 +185,7 @@ namespace concurrencpp::details {
std::experimental::coroutine_handle<> caller_handle,
bool force_rescheduling);

void when_all(std::weak_ptr<when_all_state_base> when_all_state) noexcept;
void when_all(std::shared_ptr<when_all_state_base> when_all_state) noexcept;

when_any_status when_any(std::shared_ptr<when_any_state_base> when_any_state, size_t index) noexcept;

Expand Down Expand Up @@ -369,27 +369,24 @@ namespace concurrencpp::details {
}

case 2: {
auto await_ctx = std::move(std::get<2>(this->m_consumer));
return this->schedule_continuation(await_ctx);
return this->schedule_continuation(std::get<2>(this->m_consumer));
}

case 3: {
const auto wait_ctx = std::move(std::get<3>(this->m_consumer));
auto& wait_ctx = std::get<3>(this->m_consumer);
wait_ctx->notify();
return;
}

case 4: {
auto when_all_state_weak = std::move(std::get<4>(this->m_consumer));
auto when_all_state = when_all_state_weak.lock();
if (static_cast<bool>(when_all_state)) {
when_all_state->on_result_ready();
}
auto& when_all_state = std::get<4>(this->m_consumer);
assert(static_cast<bool>(when_all_state));
when_all_state->on_result_ready();
return;
}

case 5: {
auto when_any_ctx = std::move(std::get<5>(this->m_consumer));
auto& when_any_ctx = std::get<5>(this->m_consumer);
auto& when_any_state = when_any_ctx.first;
assert(static_cast<bool>(when_any_state));
when_any_state->on_result_ready(when_any_ctx.second);
Expand Down
34 changes: 16 additions & 18 deletions concurrencpp/src/results/when_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,18 @@ namespace concurrencpp::details {

private:
tuple_type m_tuple;
result_core<tuple_type> m_core_ptr;
std::shared_ptr<result_core<tuple_type>> m_core_ptr;

template<class type>
void set_state(result<type>& result) noexcept {
auto core_ptr = when_result_helper::get_core(result);
core_ptr->when_all(this->weak_from_this());
core_ptr->when_all(this->shared_from_this());
}

public:
when_all_tuple_state(result_types&& ... results) noexcept :
m_tuple(std::forward<result_types>(results)...) {
m_tuple(std::forward<result_types>(results)...),
m_core_ptr(std::make_shared<result_core<tuple_type>>()) {
m_counter = sizeof ... (result_types);
}

Expand All @@ -88,14 +89,12 @@ namespace concurrencpp::details {
return;
}

m_core_ptr.set_result(std::move(m_tuple));
m_core_ptr.publish_result();
m_core_ptr->set_result(std::move(m_tuple));
m_core_ptr->publish_result();
}

result<tuple_type> get_result() noexcept {
return {
std::shared_ptr<result_core<tuple_type>>(this->shared_from_this(), &m_core_ptr)
};
return { m_core_ptr };
}
};

Expand All @@ -106,23 +105,24 @@ namespace concurrencpp::details {

private:
std::vector<type> m_vector;
result_core<std::vector<type>> m_core_ptr;
std::shared_ptr<result_core<std::vector<type>>> m_core_ptr;

template<class given_type>
void set_state(result<given_type>& result) noexcept {
auto core_ptr = when_result_helper::get_core(result);;
core_ptr->when_all(this->weak_from_this());
core_ptr->when_all(this->shared_from_this());
}

public:
template<class iterator_type>
when_all_vector_state(iterator_type begin, iterator_type end) :
m_vector(std::make_move_iterator(begin), std::make_move_iterator(end)) {
m_vector(std::make_move_iterator(begin), std::make_move_iterator(end)),
m_core_ptr(std::make_shared<result_core<std::vector<type>>>()) {
m_counter = m_vector.size();
}

void set_state() noexcept {
for(auto& result : m_vector) {
for (auto& result : m_vector) {
set_state(result);
}
}
Expand All @@ -132,14 +132,12 @@ namespace concurrencpp::details {
return;
}

m_core_ptr.set_result(std::move(m_vector));
m_core_ptr.publish_result();
m_core_ptr->set_result(std::move(m_vector));
m_core_ptr->publish_result();
}

result<std::vector<type>> get_result() noexcept {
return {
std::shared_ptr<result_core<std::vector<type>>>(this->shared_from_this(), &m_core_ptr)
};
return { m_core_ptr };
}
};
}
Expand Down Expand Up @@ -381,7 +379,7 @@ namespace concurrencpp {

template<class iterator_type>
result<when_any_result<std::vector<typename std::iterator_traits<iterator_type>::value_type>>>
when_any(iterator_type begin, iterator_type end) {
when_any(iterator_type begin, iterator_type end) {
details::when_result_helper::throw_if_empty_range(
details::consts::k_when_any_empty_result_error_msg,
begin,
Expand Down
Loading

0 comments on commit fbc8c47

Please sign in to comment.