Skip to content

Synchronized function prototype. #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions async_semaphore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ bot(int n, async_semaphore &sem, std::chrono::milliseconds deadline)
errors++; \
}


#define check_le(X, Y) \
if (X > Y) \
{ \
printf(#X " <= " #Y " failed: %d <= %d\n", X, Y); \
errors++; \
}

int test_value()
{
int errors = 0;
Expand Down Expand Up @@ -122,11 +130,53 @@ int test_value()
return errors;
}

int test_sync()
{
asio::io_context ioc;
int errors = 0;
async_semaphore se2{ioc.get_executor(), 3}; //allow at most three in parallel

std::vector<int> order; // isn't 100% defined!

static int concurrent = 0;

auto op =
[&](int id, auto && token)
{
return asio::co_spawn(ioc, [&, id]() -> asio::awaitable<void>
{
check_le(concurrent, 3);
concurrent ++;
printf("Entered %d\n", id);

asio::steady_timer tim{co_await asio::this_coro::executor, std::chrono::milliseconds{10}};
co_await tim.async_wait(asio::use_awaitable);
printf("Exited %d\n", id);
concurrent --;
}, std::move(token));
};

synchronized(se2, std::bind(op, 0, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 2, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 4, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 6, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 8, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 10, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 12, std::placeholders::_1), asio::detached);
synchronized(se2, std::bind(op, 14, std::placeholders::_1), asio::detached);

ioc.run();

return errors;

}

int
main()
{
int res = 0;
res += test_value();
res += test_sync();

auto ioc = asio::io_context(ASIO_CONCURRENCY_HINT_UNSAFE);
auto sem = async_semaphore(ioc.get_executor(), 10);
Expand All @@ -139,6 +189,10 @@ main()
{ return std::chrono::milliseconds(dist(eng)); };
for (int i = 0; i < 100; i += 2)
co_spawn(ioc, bot(i, sem, random_time()), detached);


ioc.run();


return res;
}
72 changes: 72 additions & 0 deletions include/asioex/async_semaphore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <asio/any_io_executor.hpp>
#include <asio/async_result.hpp>
#include <asio/detail/config.hpp>
#include <asio/experimental/prepend.hpp>
#include <asioex/detail/bilist_node.hpp>
#include <asioex/error_code.hpp>

Expand Down Expand Up @@ -128,6 +129,77 @@ struct basic_async_semaphore : async_semaphore_base

using async_semaphore = basic_async_semaphore<>;

template<typename Executor, typename Op, typename Signature>
struct synchronized_op;

template<typename Executor, typename Op, typename Err, typename ... Args>
struct synchronized_op<Executor, Op, void (Err, Args...)>
{
basic_async_semaphore<Executor> & sm;
Op op;

struct semaphore_tag {};
struct op_tag {};

static auto make_error_impl(error_code ec, error_code *)
{
return ec;
}

static auto make_error_impl(error_code ec, std::exception_ptr *)
{
return std::make_exception_ptr(std::system_error(ec));
}

static auto make_error(error_code ec)
{
return make_error_impl(ec, static_cast<Err*>(nullptr));
}

template<typename Self>
void operator()(Self && self) // init
{
if (self.get_cancellation_state().cancelled() != asio::cancellation_type::none)
return std::move(self).complete(make_error(asio::error::operation_aborted), Args{}...);

sm.async_acquire(
asio::experimental::prepend(std::move(self), semaphore_tag{}));
}

template<typename Self>
void operator()(Self && self, semaphore_tag, error_code ec) // semaphore obtained
{
std::move(op)(asio::experimental::prepend(std::move(self), op_tag{}));
}

template<typename Self, typename ... Args_>
void operator()(Self && self, op_tag, Args_ && ... args ) // semaphore obtained
{
sm.release();
std::move(self).complete(std::forward<Args_>(args)...);
}
};



/// Function to run OPs only when the semaphore can be acquired.
/// That way an artificial number of processes can run in parallel.
template<typename Executor, typename Op,
ASIO_COMPLETION_TOKEN_FOR(typename decltype(std::declval<Op>()(asio::experimental::detail::deferred_signature_probe{}))::type)
CompletionToken
ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(Executor)>
auto synchronized(basic_async_semaphore<Executor> & sm,
Op && op,
CompletionToken && completion_token)
{
using sig_t = typename decltype(std::declval<Op>()(asio::experimental::detail::deferred_signature_probe{}))::type;

using cop = synchronized_op<Executor, std::decay_t<Op>, sig_t>;

return asio::async_compose<CompletionToken, sig_t>(cop{sm, std::forward<Op>(op)}, completion_token, sm);

}

} // namespace asioex

#endif
Expand Down