Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions category/async/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ namespace detail
explicit within_completions_holder(AsyncIO_per_thread_state_t *parent_)
: parent(parent_)
{
MONAD_ASSERT(parent->within_completions_count < 20);
parent->within_completions_count++;
}

Expand Down Expand Up @@ -300,14 +301,29 @@ void AsyncIO::submit_request_(
std::span<std::byte> buffer, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio)
{
poll_uring_while_submission_queue_full_();

submit_request_sqe_(buffer, chunk_and_offset, uring_data, prio);
}

void AsyncIO::submit_request_sqe_(
std::span<std::byte> buffer, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio)
{
if (filter_fn_ != nullptr) {
if (!filter_fn_(buffer, chunk_and_offset, uring_data, prio)) {
// don't submit this i/o
return;
}
}

MONAD_DEBUG_ASSERT(uring_data != nullptr);
MONAD_DEBUG_ASSERT((chunk_and_offset.offset & (DISK_PAGE_SIZE - 1)) == 0);
MONAD_DEBUG_ASSERT(buffer.size() <= READ_BUFFER_SIZE);
#ifndef NDEBUG
memset(buffer.data(), 0xff, buffer.size());
#endif

poll_uring_while_submission_queue_full_();
struct io_uring_sqe *sqe = io_uring_get_sqe(&uring_.get_ring());
MONAD_ASSERT(sqe);

Expand Down Expand Up @@ -500,6 +516,10 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
io_uring_cq_ready(other_ring) > max_cq_entries) {
break;
}
std::cout << " dequeue_concurrent_read_ios_pending: inflight "
<< records_.inflight_rd << " pending "
<< concurrent_read_ios_pending_.count << " limit "
<< concurrent_read_io_limit_ << std::endl;
auto *next =
erased_connected_operation::rbtree_node_traits::get_right(
state);
Expand All @@ -522,6 +542,10 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
erased_connected_operation *state = nullptr;
result<size_t> res(success(0));
auto get_cqe = [&] {
if (MONAD_UNLIKELY(paused_)) {
return false;
}

auto const inflight_ts =
records_.inflight_ts.load(std::memory_order_acquire);

Expand Down Expand Up @@ -614,8 +638,23 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
}
else {
state = reinterpret_cast<erased_connected_operation *>(data);
res = (cqe->res < 0) ? result<size_t>(posix_code(-cqe->res))
: result<size_t>(cqe->res);
if (cqe_filter_fn_ != nullptr) {
int32_t cres = cqe->res;
if (!cqe_filter_fn_(cqe, cres)) {
// don't process this cqe
if (cqe != nullptr) {
io_uring_cqe_seen(ring, cqe);
cqe = nullptr;
}
return false;
}
res = (cres < 0) ? result<size_t>(posix_code(-cres))
: result<size_t>(cres);
}
else {
res = (cqe->res < 0) ? result<size_t>(posix_code(-cqe->res))
: result<size_t>(cqe->res);
}
}
if (cqe != nullptr) {
io_uring_cqe_seen(ring, cqe);
Expand Down
25 changes: 24 additions & 1 deletion category/async/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@

MONAD_ASYNC_NAMESPACE_BEGIN

namespace test
{
class AsyncTest;
};

class read_single_buffer_sender;

// helper struct that records IO stats
Expand Down Expand Up @@ -84,6 +89,14 @@ class AsyncIO final
}
};

std::function<bool(
std::span<std::byte> buffer, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio)>
filter_fn_{};

std::function<bool(struct io_uring_cqe *cqe, int32_t &res)>
cqe_filter_fn_{};

pid_t const owning_tid_;
class storage_pool *storage_pool_{nullptr};
chunk_ptr_<cnv_chunk> cnv_chunk_;
Expand All @@ -100,6 +113,7 @@ class AsyncIO final
monad::io::BufferPool wr_pool_;
bool eager_completions_{false};
bool capture_io_latencies_{false};
bool paused_{false};

// IO records
IORecord records_;
Expand All @@ -114,6 +128,9 @@ class AsyncIO final
void submit_request_(
std::span<std::byte> buffer, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio);
void submit_request_sqe_(
std::span<std::byte> buffer, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio);
void submit_request_(
std::span<const struct iovec> buffers, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio);
Expand All @@ -125,6 +142,8 @@ class AsyncIO final
void poll_uring_while_submission_queue_full_();
size_t poll_uring_(bool blocking, unsigned poll_rings_mask);

friend class test::AsyncTest;

public:
AsyncIO(class storage_pool &pool, monad::io::Buffers &rwbuf);

Expand Down Expand Up @@ -335,6 +354,10 @@ class AsyncIO final
{
if (concurrent_read_io_limit_ > 0) {
if (records_.inflight_rd >= concurrent_read_io_limit_) {
std::cout << " submit_read_request: inflight "
<< records_.inflight_rd << " pending "
<< concurrent_read_ios_pending_.count << " limit "
<< concurrent_read_io_limit_ << std::endl;
auto *state = (erased_connected_operation *)uring_data;
erased_connected_operation::rbtree_node_traits::set_right(
state, nullptr);
Expand Down Expand Up @@ -668,7 +691,7 @@ class AsyncIO final
using erased_connected_operation_ptr =
AsyncIO::erased_connected_operation_unique_ptr_type;

static_assert(sizeof(AsyncIO) == 224);
// static_assert(sizeof(AsyncIO) == 280);
static_assert(alignof(AsyncIO) == 8);

namespace detail
Expand Down
141 changes: 141 additions & 0 deletions category/async/test/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,74 @@

#include <unistd.h>

MONAD_ASYNC_NAMESPACE_BEGIN

namespace test
{
struct DelayedIO
{
std::span<std::byte> buffer;
chunk_offset_t chunk_and_offset;
void *uring_data;
enum erased_connected_operation::io_priority prio;
};

class AsyncTest
{
public:
AsyncTest(AsyncIO &io)
: io_(io)
{
}

void pause()
{
io_.paused_ = true;
}

void unpause()
{
io_.paused_ = false;
}

void delay_ios()
{
io_.filter_fn_ =
[this](
std::span<std::byte> buffer,
chunk_offset_t chunk_and_offset,
void *uring_data,
enum erased_connected_operation::io_priority prio) {
delayed_ios_.emplace_back(
DelayedIO{buffer, chunk_and_offset, uring_data, prio});
return false;
};
}

void release_ios()
{
io_.filter_fn_ = nullptr;
for (auto &d : delayed_ios_) {
io_.submit_request_sqe_(
d.buffer, d.chunk_and_offset, d.uring_data, d.prio);
}
delayed_ios_.clear();
}

void set_cqe_filter(
std::function<bool(struct io_uring_cqe *cqe, int32_t &res)> fn)
{
io_.cqe_filter_fn_ = std::move(fn);
}

private:
std::vector<DelayedIO> delayed_ios_;
AsyncIO &io_;
};
} // namespace test

MONAD_ASYNC_NAMESPACE_END

namespace
{
TEST(AsyncIO, hardlink_fd_to)
Expand Down Expand Up @@ -252,4 +320,77 @@ namespace
}
EXPECT_EQ(seq.back(), offset - monad::async::DISK_PAGE_SIZE);
}

struct sqe_read_exhaustion_receiver
{
uint64_t *completions;

static constexpr bool lifetime_managed_internally = false;

void set_value(
monad::async::erased_connected_operation * /*io_state*/,
monad::async::read_single_buffer_sender::result_type /*buf*/)
{
++(*completions);
}
};

TEST(AsyncIO, read_eagain)
{
monad::io::RingConfig rc_rd{};
rc_rd.entries = 4000;
monad::io::RingConfig rc_wr{};
rc_wr.entries = 4;
monad::io::Ring rd_ring(rc_rd);
monad::io::Ring wr_ring(rc_wr);
auto const read_buffers_count = 20000;

auto bufs = monad::io::make_buffers_for_segregated_read_write(
rd_ring,
wr_ring,
read_buffers_count,
4, /* write count SQ=4 */
monad::async::AsyncIO::MONAD_IO_BUFFERS_READ_SIZE,
monad::async::AsyncIO::MONAD_IO_BUFFERS_WRITE_SIZE);

monad::async::storage_pool pool(
monad::async::use_anonymous_inode_tag{});
monad::async::AsyncIO aio(pool, bufs);

monad::async::test::AsyncTest async_test(aio);

aio.set_concurrent_read_io_limit(4096 + 2000);
aio.set_eager_completions(false);

int num_failures = 2000;

async_test.pause();
async_test.delay_ios();
async_test.set_cqe_filter([&](struct io_uring_cqe *, int32_t &res) {
// Simulate read i/o failures
if (num_failures > 0) {
--num_failures;
res = -EAGAIN;
}
// let this completion through
return true;
});

uint64_t completions = 0;
uint64_t const num_ios = 20000;
for (auto i = 0ul; i < num_ios; ++i) {
auto op = aio.make_connected(
monad::async::read_single_buffer_sender{
{0, 0}, monad::async::AsyncIO::MONAD_IO_BUFFERS_READ_SIZE},
sqe_read_exhaustion_receiver{.completions = &completions});
op->initiate();
op.release();
}

async_test.release_ios();
async_test.unpause();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
aio.wait_until_done();
EXPECT_EQ(completions, num_ios);
}
}
Loading