-
Notifications
You must be signed in to change notification settings - Fork 70
Open
Labels
bugSomething isn't workingSomething isn't workinggood first issueGood for newcomersGood for newcomershelp wantedExtra attention is neededExtra attention is needed
Description
I'm not a massive fan of ASIO, Boost.ASIO and the NetworkingTS that builds on top of them. I'm also not a great user. My current implementation on the asio-uring-web-server
branch doesn't work. I'd love to see someone suggesting a better way to use it 🤗
The current draft looks like this:
#include <asio.hpp>
class rpc_asio_server {
asio::io_context &context_;
asio::ip::udp::socket socket_;
/// @brief Buffers, one per concurrent request
std::vector<rpc_buffer_t> buffers_;
/// @brief Where did the packets come from
std::vector<asio::ip::udp::endpoint> clients_;
/// @brief Flag to stop the server without corrupting the state
std::atomic_bool should_stop_;
/// @brief Maximum time for this entire batch
std::chrono::microseconds max_cycle_duration_;
std::size_t failed_receptions_ = 0;
std::size_t failed_responses_ = 0;
public:
rpc_asio_server( //
asio::io_context &ctx, std::string const &address, std::uint16_t port, //
std::size_t max_concurrency, std::chrono::microseconds max_cycle_duration)
: context_(ctx), socket_(context_, asio::ip::udp::endpoint(asio::ip::make_address(address), port)),
buffers_(max_concurrency), clients_(max_concurrency), max_cycle_duration_(max_cycle_duration) {}
void stop() { should_stop_.store(true, std::memory_order_relaxed); }
void operator()() {
while (!should_stop_.load(std::memory_order_relaxed)) one_batch();
}
void one_batch() {
// For per-operation cancellations we could use the `asio::cancellation_signal`,
// but this is the simple lucky case when we only want to cancel all the outstanding
// transfers at once.
std::atomic<std::size_t> remaining = 0;
for (std::size_t job = 0; job < buffers_.size(); ++job, ++remaining) {
auto finalize = [this, &remaining](std::error_code error, std::size_t) {
remaining--;
if (error) failed_responses_++;
};
auto respond = [this, job, finalize, &remaining](std::error_code error, std::size_t bytes) {
if (error) { remaining--; }
else { socket_.async_send_to(asio::buffer(buffers_[job], bytes), clients_[job], finalize); }
};
socket_.async_receive_from(asio::buffer(buffers_[job]), clients_[job], respond);
}
std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now() + max_cycle_duration_;
asio::steady_timer timer(context_, expiry);
timer.wait();
if (remaining) socket_.cancel(); // Forcibly abort all ops on this socket
}
};
class rpc_asio_client {
asio::io_context &context_;
asio::ip::udp::socket socket_;
asio::ip::udp::endpoint server_;
/// @brief Buffers, one per concurrent request
std::vector<rpc_buffer_t> buffers_;
/// @brief Track the send timestamps for each slot to measure latency
std::vector<std::chrono::steady_clock::time_point> send_times_;
/// @brief Maximum time for this entire batch
std::chrono::microseconds max_cycle_duration_;
public:
rpc_asio_client( //
asio::io_context &ctx, std::string const &server_addr, std::uint16_t port, //
std::size_t concurrency, std::chrono::microseconds max_cycle_duration)
: context_(ctx), socket_(ctx, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0)), buffers_(concurrency),
send_times_(concurrency), max_cycle_duration_(max_cycle_duration) {
// Resolve the server address
asio::ip::udp::resolver resolver(context_);
asio::ip::udp::resolver::results_type endpoints = resolver.resolve(server_addr, std::to_string(port));
server_ = *endpoints.begin(); // Take the first resolved endpoint
// Fill each buffer with some pattern (just 'X's, for example)
for (auto &buf : buffers_) buf.fill('X');
}
rpc_batch_result operator()() { return one_batch(); }
private:
rpc_batch_result one_batch() {
rpc_batch_result result;
// For per-operation cancellations we could use the `asio::cancellation_signal`,
// but this is the simple lucky case when we only want to cancel all the outstanding
// transfers at once.
std::atomic<std::size_t> remaining = 0;
for (std::size_t job = 0; job < buffers_.size(); ++job, ++remaining) {
send_times_[job] = std::chrono::steady_clock::now();
auto finalize = [this, job, &result, &remaining](std::error_code error, std::size_t) {
remaining--;
if (error) return;
// Measure latency
auto response_time = std::chrono::steady_clock::now();
auto diff = response_time - send_times_[job];
result.batch_latency += diff;
result.max_packet_latency = std::max(result.max_packet_latency, diff);
result.received_packets++;
};
auto receive = [this, job, finalize, &remaining](std::error_code error, std::size_t bytes) {
if (error) { remaining--; }
else { socket_.async_receive_from(asio::buffer(buffers_[job], bytes), server_, finalize); }
};
socket_.async_send_to(asio::buffer(buffers_[job]), server_, receive);
result.sent_packets++;
}
std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now() + max_cycle_duration_;
asio::steady_timer timer(context_, expiry);
timer.wait();
if (remaining) socket_.cancel(); // Forcibly abort all ops on this socket
return result;
}
};
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workinggood first issueGood for newcomersGood for newcomershelp wantedExtra attention is neededExtra attention is needed