Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions async_simple/Signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,17 @@ class Slot {
"we dont allow emplace an empty signal handler");
logicAssert(std::popcount(static_cast<uint64_t>(type)) == 1,
"It's not allow to emplace for multiple signals");
// trigger-once signal has already been triggered
auto handler = std::make_unique<detail::SignalSlotSharedState::Handler>(
std::forward<Args>(args)...);
auto oldHandlerPtr = loadHandler<true>(type);
// check trigger-once signal has already been triggered
// if signal has already been triggered, return false
if (!detail::SignalSlotSharedState::isMultiTriggerSignal(type) &&
(signal()->state() & type)) {
return false;
}
auto handler = std::make_unique<detail::SignalSlotSharedState::Handler>(
std::forward<Args>(args)...);
auto oldHandlerPtr = loadHandler<true>(type);
// if signal triggered later, we will found it by atomic handler CAS
// failed.
auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire);
if (oldHandler ==
&detail::SignalSlotSharedState::HandlerManager::emittedTag) {
Expand Down
22 changes: 8 additions & 14 deletions async_simple/coro/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct CollectAnyAwaiter {
_slot, [c = continuation, e = event, size = input.size()](
SignalType type, Signal*) mutable {
auto count = e->downCount();
if (count > size + 1) {
if (count == size + 1) {
c.resume();
}
})) { // has canceled
Expand All @@ -186,7 +186,7 @@ struct CollectAnyAwaiter {
assert(e != nullptr);
auto count = e->downCount();
// n+1: n coro + 1 cancel handler
if (count > size + 1) {
if (count == size + 1) {
_result = std::make_unique<ResultType>();
_result->_idx = i;
_result->_value = std::move(result);
Expand Down Expand Up @@ -268,7 +268,7 @@ struct CollectAnyVariadicAwaiter {
_slot, [c = continuation, e = event](SignalType type,
Signal*) mutable {
auto count = e->downCount();
if (count > std::tuple_size<InputType>() + 1) {
if (count == std::tuple_size<InputType>() + 1) {
c.resume();
}
})) { // has canceled
Expand All @@ -290,7 +290,7 @@ struct CollectAnyVariadicAwaiter {
res) mutable {
auto count = e->downCount();
// n+1: n coro + 1 cancel handler
if (count > std::tuple_size<InputType>() + 1) {
if (count == std::tuple_size<InputType>() + 1) {
_result = std::make_unique<ResultType>(
std::in_place_index_t<index>(), std::move(res));
if (auto ptr = local->getSlot(); ptr) {
Expand Down Expand Up @@ -388,7 +388,10 @@ struct CollectAllAwaiter {
_slot->chainedSignal(_signal.get());

auto executor = promise_type._executor;
for (size_t i = 0; i < _input.size(); ++i) {

_event.setAwaitingCoro(continuation);
auto size = _input.size();
for (size_t i = 0; i < size; ++i) {
auto& exec = _input[i]._coro.promise()._executor;
if (exec == nullptr) {
exec = executor;
Expand Down Expand Up @@ -422,11 +425,6 @@ struct CollectAllAwaiter {
}
func();
}
_event.setAwaitingCoro(continuation);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the coroutine may be suspended forever if the input was empty.

auto awaitingCoro = _event.down();
if (awaitingCoro) {
awaitingCoro.resume();
}
}
inline auto await_resume() { return std::move(_output); }

Expand Down Expand Up @@ -602,10 +600,6 @@ struct CollectAllVariadicAwaiter {
}
}(std::get<index>(_inputs), std::get<index>(_results)),
...);

if (auto awaitingCoro = _event.down(); awaitingCoro) {
awaitingCoro.resume();
}
}

void await_suspend(std::coroutine_handle<> continuation) {
Expand Down
2 changes: 1 addition & 1 deletion async_simple/coro/CountEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace detail {
// The last 'down' will resume the awaiting coroutine on this event.
class CountEvent {
public:
CountEvent(size_t count) : _count(count + 1) {}
CountEvent(size_t count) : _count(count) {}
CountEvent(const CountEvent&) = delete;
CountEvent(CountEvent&& other)
: _count(other._count.exchange(0, std::memory_order_relaxed)),
Expand Down
Loading