Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ownership in async_scope::detached_spawn #376

Merged
merged 1 commit into from
Nov 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions include/unifex/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,38 @@ enum class op_state : unsigned int {
};

struct _spawn_op_base {
struct owning_stop_callback final {
explicit owning_stop_callback(_spawn_op_base* self) noexcept
: self_(self) {}

owning_stop_callback(owning_stop_callback&& other) noexcept
: self_(std::exchange(other.self_, nullptr)) {}

~owning_stop_callback() {
if (self_) {
self_->decref();
}
}

void operator()() noexcept {
auto self = std::exchange(self_, nullptr);
self->request_stop();
self->decref();
}

private:
_spawn_op_base* self_;
};

struct stop_callback final {
_spawn_op_base* self;

void operator()() noexcept { self->request_stop(); }
};

using owning_stop_callback_t =
typename inplace_stop_token::template callback_type<owning_stop_callback>;

using stop_callback_t =
typename inplace_stop_token::template callback_type<stop_callback>;

Expand All @@ -73,15 +99,7 @@ struct _spawn_op_base {
: scope_(scope)
, cleanup_(cleanup)
, state_{detached ? op_state::detached : op_state::incomplete}
, refCount_{detached ? 1u : 2u} {}

~_spawn_op_base() {
// if scope_ is nullptr here then the operation failed to start, which means
// we didn't construct the stop callback and thus must not destruct it.
if (scope_) {
stopCallback_.destruct();
}
}
, refCount_{detached ? 2u : 3u} {}

/**
* Invoked by the attached future<> when it goes out of scope without having
Expand Down Expand Up @@ -132,6 +150,8 @@ struct _spawn_op_base {
* Wakes up the attached future<> if it's still there.
*/
void set_done() noexcept {
destroy_stop_callback();

// only bother to notify the waiting future<> if it's still there
if (try_set_state(op_state::done)) {
evt_.set();
Expand All @@ -150,6 +170,10 @@ struct _spawn_op_base {
// mark the operation as not started
scope_ = nullptr;

// drop the reference that *would* have been held by the stop callback we're
// not going to construct
decref();

// ensure an attached future<> is immediately ready
set_done();
}
Expand All @@ -171,7 +195,7 @@ struct _spawn_op_base {
// either the associated scope or the attached future<>
inplace_stop_source stopSource_;
// a stop callback that listens for stop requests from the associated scope
manual_lifetime<stop_callback_t> stopCallback_;
manual_lifetime<owning_stop_callback_t> stopCallback_;

bool detached() const noexcept {
return op_state::detached == state_.load(std::memory_order_relaxed);
Expand All @@ -197,6 +221,14 @@ struct _spawn_op_base {
}
}

void destroy_stop_callback() noexcept {
// if scope_ is nullptr here then the operation failed to start, which means
// we didn't construct the stop callback and thus must not destruct it.
if (scope_) {
stopCallback_.destruct();
}
}

/**
* Invoked when the operation completes with an error.
*
Expand All @@ -210,6 +242,8 @@ struct _spawn_op_base {
void set_error(
manual_lifetime<std::exception_ptr>& exception,
std::exception_ptr e) noexcept {
destroy_stop_callback();

if (try_set_state(op_state::error)) {
unifex::activate_union_member(exception, std::move(e));
evt_.set();
Expand All @@ -222,7 +256,7 @@ struct _spawn_op_base {

void start() noexcept {
stopCallback_.construct(
get_stop_token_from_scope(scope_), stop_callback{this});
get_stop_token_from_scope(scope_), owning_stop_callback{this});
}

bool try_set_state(op_state newState) noexcept {
Expand Down Expand Up @@ -265,6 +299,8 @@ struct _spawn_op_promise final {
template(typename... Values) //
(requires constructible_from<std::tuple<Ts...>, Values&&...>) //
void set_value(Values&&... values) noexcept {
destroy_stop_callback();

if (try_set_state(op_state::value)) {
UNIFEX_TRY {
unifex::activate_union_member(value_, (Values &&) values...);
Expand Down