Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions async_simple/Signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,17 @@ class Slot {
"we dont allow emplace an empty signal handler");
logicAssert(std::popcount(static_cast<uint64_t>(type)) == 1,
"It's not allow to emplace for multiple signals");
// trigger-once signal has already been triggered
auto handler = std::make_unique<detail::SignalSlotSharedState::Handler>(
std::forward<Args>(args)...);
auto oldHandlerPtr = loadHandler<true>(type);
// check trigger-once signal has already been triggered
// if signal has already been triggered, return false
if (!detail::SignalSlotSharedState::isMultiTriggerSignal(type) &&
(signal()->state() & type)) {
return false;
}
auto handler = std::make_unique<detail::SignalSlotSharedState::Handler>(
std::forward<Args>(args)...);
auto oldHandlerPtr = loadHandler<true>(type);
// if signal triggered later, we will found it by atomic handler CAS
// failed.
auto oldHandler = oldHandlerPtr->load(std::memory_order_acquire);
if (oldHandler ==
&detail::SignalSlotSharedState::HandlerManager::emittedTag) {
Expand Down
22 changes: 8 additions & 14 deletions async_simple/coro/Collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct CollectAnyAwaiter {
_slot, [c = continuation, e = event, size = input.size()](
SignalType type, Signal*) mutable {
auto count = e->downCount();
if (count > size + 1) {
if (count == size + 1) {
c.resume();
}
})) { // has canceled
Expand All @@ -186,7 +186,7 @@ struct CollectAnyAwaiter {
assert(e != nullptr);
auto count = e->downCount();
// n+1: n coro + 1 cancel handler
if (count > size + 1) {
if (count == size + 1) {
_result = std::make_unique<ResultType>();
_result->_idx = i;
_result->_value = std::move(result);
Expand Down Expand Up @@ -268,7 +268,7 @@ struct CollectAnyVariadicAwaiter {
_slot, [c = continuation, e = event](SignalType type,
Signal*) mutable {
auto count = e->downCount();
if (count > std::tuple_size<InputType>() + 1) {
if (count == std::tuple_size<InputType>() + 1) {
c.resume();
}
})) { // has canceled
Expand All @@ -290,7 +290,7 @@ struct CollectAnyVariadicAwaiter {
res) mutable {
auto count = e->downCount();
// n+1: n coro + 1 cancel handler
if (count > std::tuple_size<InputType>() + 1) {
if (count == std::tuple_size<InputType>() + 1) {
_result = std::make_unique<ResultType>(
std::in_place_index_t<index>(), std::move(res));
if (auto ptr = local->getSlot(); ptr) {
Expand Down Expand Up @@ -388,7 +388,10 @@ struct CollectAllAwaiter {
_slot->chainedSignal(_signal.get());

auto executor = promise_type._executor;
for (size_t i = 0; i < _input.size(); ++i) {

_event.setAwaitingCoro(continuation);
auto size = _input.size();
for (size_t i = 0; i < size; ++i) {
auto& exec = _input[i]._coro.promise()._executor;
if (exec == nullptr) {
exec = executor;
Expand Down Expand Up @@ -422,11 +425,6 @@ struct CollectAllAwaiter {
}
func();
}
_event.setAwaitingCoro(continuation);
auto awaitingCoro = _event.down();
if (awaitingCoro) {
awaitingCoro.resume();
}
}
inline auto await_resume() { return std::move(_output); }

Expand Down Expand Up @@ -602,10 +600,6 @@ struct CollectAllVariadicAwaiter {
}
}(std::get<index>(_inputs), std::get<index>(_results)),
...);

if (auto awaitingCoro = _event.down(); awaitingCoro) {
awaitingCoro.resume();
}
}

void await_suspend(std::coroutine_handle<> continuation) {
Expand Down
2 changes: 1 addition & 1 deletion async_simple/coro/CountEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace detail {
// The last 'down' will resume the awaiting coroutine on this event.
class CountEvent {
public:
CountEvent(size_t count) : _count(count + 1) {}
CountEvent(size_t count) : _count(count) {}
CountEvent(const CountEvent&) = delete;
CountEvent(CountEvent&& other)
: _count(other._count.exchange(0, std::memory_order_relaxed)),
Expand Down
1 change: 1 addition & 0 deletions async_simple/coro/Mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#ifndef ASYNC_SIMPLE_USE_MODULES
#include <atomic>
#include <cassert>
#include <mutex>
#include "async_simple/experimental/coroutine.h"

Expand Down
9 changes: 7 additions & 2 deletions async_simple/coro/SpinLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#define ASYNC_SIMPLE_CORO_SPIN_LOCK_H

#ifndef ASYNC_SIMPLE_USE_MODULES
#include <atomic>
#include <cassert>
#include <mutex>
#include <thread>
#include "async_simple/coro/Lazy.h"
Expand Down Expand Up @@ -60,9 +62,12 @@ class SpinLock {
}
}

void unlock() noexcept { _locked.store(false, std::memory_order_release); }
void unlock() noexcept {
assert(_locked.load(std::memory_order_acquire) == true);
_locked.store(false, std::memory_order_release);
}

Lazy<std::unique_lock<SpinLock>> coScopedLock() {
[[nodiscard]] Lazy<std::unique_lock<SpinLock>> coScopedLock() {
co_await coLock();
co_return std::unique_lock<SpinLock>{*this, std::adopt_lock};
}
Expand Down
6 changes: 3 additions & 3 deletions async_simple/coro/test/SleepTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Lazy<void> cancelSleep() {
auto sleep = [](int i, std::atomic<int>& cnt) -> Lazy<void> {
bool cancel_flag = false;
try {
co_await async_simple::coro::sleep(i * 1s);
co_await async_simple::coro::sleep(i * 2s);
} catch (const async_simple::SignalException& err) {
++cnt;
cancel_flag = true;
Expand All @@ -156,7 +156,7 @@ Lazy<void> cancelSleep() {
auto tp2 = std::chrono::steady_clock::now();
EXPECT_EQ(cnt, 99);
std::cout << "cost time: " << (tp2 - tp1) / 1ms << "ms" << std::endl;
EXPECT_LE((tp2 - tp1) / 1ms, 800);
EXPECT_LE((tp2 - tp1) / 1ms, 1800);
}
{
std::vector<RescheduleLazy<void>> works;
Expand All @@ -169,7 +169,7 @@ Lazy<void> cancelSleep() {
auto tp2 = std::chrono::steady_clock::now();
EXPECT_EQ(cnt, 99);
std::cout << "cost time: " << (tp2 - tp1) / 1ms << "ms" << std::endl;
EXPECT_LE((tp2 - tp1) / 1ms, 800);
EXPECT_LE((tp2 - tp1) / 1ms, 1800);
}
}

Expand Down
Loading