Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions category/async/erased_connected_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ enum class operation_type : uint8_t
unknown,
read,
write,
fsync,
timeout,
threadsafeop,
read_scatter
Expand Down Expand Up @@ -544,6 +545,11 @@ class erased_connected_operation
return operation_type_ == operation_type::write;
}

bool is_fsync() const noexcept
{
return operation_type_ == operation_type::fsync;
}

bool is_timeout() const noexcept
{
return operation_type_ == operation_type::timeout;
Expand Down
40 changes: 39 additions & 1 deletion category/async/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,38 @@ void AsyncIO::account_read_()
++records_.nreads;
}

void AsyncIO::submit_sync_file_request(
chunk_offset_t const chunk_and_offset, unsigned const nbytes,
erased_connected_operation *const uring_data)
{
MONAD_DEBUG_ASSERT(uring_data != nullptr);
MONAD_ASSERT(!rwbuf_.is_read_only());
MONAD_ASSERT(
chunk_and_offset.offset + nbytes <=
chunk_capacity(chunk_and_offset.id));

if (capture_io_latencies_) {
uring_data->initiated = std::chrono::steady_clock::now();
}
auto *const wr_ring =
(wr_uring_ != nullptr) ? &wr_uring_->get_ring() : &uring_.get_ring();
struct io_uring_sqe *sqe = io_uring_get_sqe(wr_ring);
MONAD_ASSERT(sqe);

auto const &ci = seq_chunks_[chunk_and_offset.id];
io_uring_prep_fsync(sqe, ci.io_uring_write_fd, IORING_FSYNC_DATASYNC);
// auto const offset = ci.ptr->write_fd(0).second;
sqe->flags |= IOSQE_FIXED_FILE;
sqe->flags |= IOSQE_ASYNC;
// sqe->off = offset;
// sqe->len = nbytes;
io_uring_sqe_set_data(sqe, (void *)uring_data);

MONAD_ASYNC_IO_URING_RETRYABLE(io_uring_submit(wr_ring));
printf("submitted fsync\n");
++records_.inflight_fsync;
}

void AsyncIO::submit_request_sqe_(
std::span<std::byte> buffer, chunk_offset_t chunk_and_offset,
void *uring_data, enum erased_connected_operation::io_priority prio)
Expand Down Expand Up @@ -526,7 +558,8 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
auto const inflight_ts =
records_.inflight_ts.load(std::memory_order_acquire);

if (wr_ring != nullptr && records_.inflight_wr > 0 &&
if (wr_ring != nullptr &&
(records_.inflight_wr + records_.inflight_fsync) > 0 &&
(poll_rings_mask & 2) == 0) {
ring = wr_ring;
if (wr_uring_->must_call_uring_submit() ||
Expand Down Expand Up @@ -562,6 +595,7 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
MONAD_ASYNC_IO_URING_RETRYABLE(io_uring_submit(other_ring));
}
if (blocking && inflight_ts == 0 && records_.inflight_wr == 0 &&
records_.inflight_fsync == 0 &&
detail::AsyncIO_per_thread_state().empty()) {
MONAD_ASYNC_IO_URING_RETRYABLE(io_uring_wait_cqe(ring, &cqe));
}
Expand Down Expand Up @@ -666,6 +700,10 @@ size_t AsyncIO::poll_uring_(bool blocking, unsigned poll_rings_mask)
--records_.inflight_wr;
is_read_or_write = true;
}
else if (state->is_fsync()) {
printf("completed fsync\n");
--records_.inflight_fsync;
}
else if (state->is_timeout()) {
--records_.inflight_tm;
}
Expand Down
79 changes: 61 additions & 18 deletions category/async/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct IORecord
unsigned inflight_rd{0};
unsigned inflight_rd_scatter{0};
unsigned inflight_wr{0};
unsigned inflight_fsync{0};
unsigned inflight_tm{0};
std::atomic<unsigned> inflight_ts{0};

Expand Down Expand Up @@ -187,11 +188,18 @@ class AsyncIO final
return records_.inflight_rd +
static_cast<unsigned>(concurrent_read_ios_pending_.size()) +
records_.inflight_rd_scatter + records_.inflight_wr +
records_.inflight_tm +
records_.inflight_fsync + records_.inflight_tm +
records_.inflight_ts.load(std::memory_order_relaxed) +
deferred_initiations_in_flight();
}

unsigned rw_io_inflight() const noexcept
{
return records_.inflight_rd +
static_cast<unsigned>(concurrent_read_ios_pending_.size()) +
records_.inflight_rd_scatter + records_.inflight_wr;
}

unsigned reads_in_flight() const noexcept
{
return records_.inflight_rd +
Expand Down Expand Up @@ -327,11 +335,23 @@ class AsyncIO final
}
}

void wait_until_rw_done()
{
while (rw_io_inflight() > 0) {
poll_blocking(size_t(-1));
}
}

void flush()
{
wait_until_done();
}

void flush_rw()
{
wait_until_rw_done();
}

void reset_records()
{
records_.max_inflight_rd = 0;
Expand Down Expand Up @@ -389,6 +409,10 @@ class AsyncIO final
}
}

void submit_sync_file_request(
chunk_offset_t chunk_and_offset, unsigned nbytes,
erased_connected_operation *uring_data);

/* This isn't the ideal place to put this, but only AsyncIO knows how to
get i/o buffers into which to place connected i/o states.
*/
Expand Down Expand Up @@ -497,7 +521,7 @@ class AsyncIO final
private:
unsigned char *poll_uring_while_no_io_buffers_(bool is_write);

template <bool is_write, class F>
template <class F>
auto make_connected_impl_(F &&connect)
{
using connected_type = decltype(connect());
Expand All @@ -509,14 +533,22 @@ class AsyncIO final
MONAD_ASSERT_PRINTF(
mem != nullptr, "failed due to %s", strerror(errno));
MONAD_DEBUG_ASSERT(((void)mem[0], true));
auto ret = std::unique_ptr<
return std::unique_ptr<
connected_type,
io_connected_operation_unique_ptr_deleter>(
new (mem) connected_type(connect()));
}

template <operation_type op_type, class F>
requires(
op_type == operation_type::read || op_type == operation_type::write)
auto make_connected_impl_rw_(F &&connect)
{
auto ret = make_connected_impl_(std::forward<F>(connect));
// Did you accidentally pass in a foreign buffer to use?
// Can't do that, must use buffer returned.
MONAD_DEBUG_ASSERT(ret->sender().buffer().data() == nullptr);
if constexpr (is_write) {
if constexpr (op_type == operation_type::write) {
MONAD_DEBUG_ASSERT(rwbuf_.get_write_size() >= WRITE_BUFFER_SIZE);
auto buffer = std::move(ret->sender()).buffer();
buffer.set_write_buffer(get_write_buffer());
Expand All @@ -542,11 +574,10 @@ class AsyncIO final
})
auto make_connected(Sender &&sender, Receiver &&receiver)
{
return make_connected_impl_ < Sender::my_operation_type ==
operation_type::write > ([&] {
return connect<Sender, Receiver>(
*this, std::move(sender), std::move(receiver));
});
return make_connected_impl_rw_<Sender::my_operation_type>([&] {
return connect<Sender, Receiver>(
*this, std::move(sender), std::move(receiver));
});
}

//! Construct into internal memory a connected state for an i/o read
Expand All @@ -568,14 +599,26 @@ class AsyncIO final
std::piecewise_construct_t _, std::tuple<SenderArgs...> &&sender_args,
std::tuple<ReceiverArgs...> &&receiver_args)
{
return make_connected_impl_ < Sender::my_operation_type ==
operation_type::write > ([&] {
return connect<Sender, Receiver>(
*this,
_,
std::move(sender_args),
std::move(receiver_args));
});
return make_connected_impl_<Sender::my_operation_type>([&] {
return connect<Sender, Receiver>(
*this, _, std::move(sender_args), std::move(receiver_args));
});
}

template <sender Sender, receiver Receiver>
requires(
Sender::my_operation_type == operation_type::fsync &&
requires(
Receiver r, erased_connected_operation *o,
typename Sender::result_type x) {
r.set_value(o, std::move(x));
})
auto make_connected(Sender &&sender, Receiver &&receiver)
{
return make_connected_impl_([&] {
return connect<Sender, Receiver>(
*this, std::move(sender), std::move(receiver));
});
}

template <class Base, sender Sender, receiver Receiver>
Expand Down Expand Up @@ -658,7 +701,7 @@ class AsyncIO final
using erased_connected_operation_ptr =
AsyncIO::erased_connected_operation_unique_ptr_type;

static_assert(sizeof(AsyncIO) == 272);
static_assert(sizeof(AsyncIO) == 280);
static_assert(alignof(AsyncIO) == 8);

namespace detail
Expand Down
42 changes: 42 additions & 0 deletions category/async/io_senders.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,46 @@ static_assert(sizeof(write_single_buffer_sender) == 48);
static_assert(alignof(write_single_buffer_sender) == 8);
static_assert(sender<write_single_buffer_sender>);

struct sync_file_sender
{
private:
chunk_offset_t offset_;
unsigned bytes_;

public:
using result_type = result<void>;

static constexpr operation_type my_operation_type = operation_type::fsync;

constexpr sync_file_sender(
chunk_offset_t const offset, unsigned const bytes_to_sync)
: offset_(offset)
, bytes_(bytes_to_sync)
{
}

result<void> operator()(erased_connected_operation *io_state) noexcept
{
io_state->executor()->submit_sync_file_request(
offset_, bytes_, io_state);
return success();
}

result_type
completed(erased_connected_operation *, result<void> result) noexcept
{
if (result.has_error()) {
fprintf(
stderr,
"ERROR: Sync file of %u bytes to chunk %u offset %llu failed "
"with error '%s'\n",
bytes_,
offset_.id,
file_offset_t(offset_.offset),
result.assume_error().message().c_str());
}
return result;
}
};

MONAD_ASYNC_NAMESPACE_END
10 changes: 6 additions & 4 deletions category/mpt/trie.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Node::SharedPtr upsert(
std::move(updates));
}
if (sentinel->npending) {
aux.io->flush();
aux.io->flush_rw();
MONAD_ASSERT(sentinel->npending == 0);
}
}
Expand Down Expand Up @@ -1450,7 +1450,8 @@ node_writer_unique_ptr_type replace_node_writer_to_start_at_new_chunk(
auto ret = aux.io->make_connected(
write_single_buffer_sender{
offset_of_new_writer, AsyncIO::WRITE_BUFFER_SIZE},
write_operation_io_receiver{AsyncIO::WRITE_BUFFER_SIZE});
write_operation_io_receiver{
aux.io, offset_of_new_writer, AsyncIO::WRITE_BUFFER_SIZE});
reentrancy_detection.count--;
MONAD_ASSERT(reentrancy_detection.count >= 0);
// The deepest-most reentrancy must succeed, and all less deep reentrancies
Expand Down Expand Up @@ -1505,7 +1506,8 @@ node_writer_unique_ptr_type replace_node_writer(
(size_t)(chunk_capacity - offset_of_next_writer.offset));
auto ret = aux.io->make_connected(
write_single_buffer_sender{offset_of_next_writer, bytes_to_write},
write_operation_io_receiver{bytes_to_write});
write_operation_io_receiver{
aux.io, offset_of_next_writer, bytes_to_write});
if (node_writer.get() != node_writer_ptr) {
// We reentered, please retry
return {};
Expand Down Expand Up @@ -1691,7 +1693,7 @@ void flush_buffered_writes(UpdateAuxImpl &aux)
// replace slow node writer
replace(aux.node_writer_slow);
}
aux.io->flush();
aux.io->flush_rw();
}

// return root physical offset
Expand Down
Loading
Loading