Skip to content

Commit

Permalink
irecv: update wait semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
cwpearson committed May 10, 2024
1 parent fedb6a6 commit 3e7e298
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
18 changes: 16 additions & 2 deletions docs/api/core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,27 @@ Related Types

.. cpp:class:: KokkosComm::Req

A communication handle representing an asychronous communication. The communication is not complete until ``wait`` is called.
A communication handle representing an asychronous communication and an associated Kokkos execution space instance. The handle is scoped to the space instance used in the communication call that produced it.



.. cpp:function:: MPI_Request &KokkosComm::Req::mpi_req()

Retrieve a reference to the held MPI_Request.

.. cpp:function:: void KokkosComm::Req::wait()

Call MPI_Wait on the held MPI_Request and complete any internal communication-related operations.
Require that the communication be completed before any further work can be exected in the associated execution space instance. May or may not fence. Consider the following example.

.. code-block:: c++
:linenos:

using KC = KokkosComm;
Kokkos::parallel_for(space, ...);
auto req = KC::isend(space, ...); // isend 1
Kokkos::parallel_for(space, ...); // runs concurrently with isend 1, does not touch send view
req.wait(); // blocks space until isend 1 is complete. May or may not fence.
Kokkos::parallel_for(space, ...); // safe to overwrite the send buffer
space.fence(); // wait for all to complete

Here, ``parallel_for`` on line 6 can overwrite the send buffer because ``req.wait()`` means that isend 1 must be done before additional work can be done in ``space``. This MAY be achieved by an internal call to ``space.fence()``, but some other mechanism may be used. If the host thread wants to be sure that the communication is done, it must separately call ``space.fence()``.
3 changes: 3 additions & 0 deletions src/KokkosComm_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class Req {
Req() : impl_(std::make_shared<Impl::Req>()) {}
Req(const std::shared_ptr<Impl::Req> &impl) : impl_(impl) {}
MPI_Request &mpi_req() { return impl_->mpi_req(); }

// The communication must be done before the associated Kokkos execution space
// can do any further work. The host may or may not be fenced.
void wait() { impl_->wait(); }

private:
Expand Down
2 changes: 0 additions & 2 deletions src/impl/KokkosComm_irecv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ std::shared_ptr<Req> irecv(const ExecSpace &space, RecvView &rv, int src,
MPI_Irecv(KCT::data_handle(args.view), args.count, args.datatype, src, tag,
comm, &req->mpi_req());
req->call_and_drop_at_wait(IrecvUnpacker{space, rv, args});
req->fence_at_wait(space);

} else {
using RecvScalar = typename RecvView::value_type;
MPI_Irecv(KCT::data_handle(rv), KCT::span(rv), mpi_type_v<RecvScalar>, src,
Expand Down
22 changes: 10 additions & 12 deletions src/impl/KokkosComm_request_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ class Req {

MPI_Request &mpi_req() { return req_; }

void wait_async() {
// The communication must be done before the held execution space can do any
// further work. For MPI, this is achieved by blocking the host thread and
// fencing the execution space.
void wait() {
MPI_Wait(&req_, MPI_STATUS_IGNORE);
for (auto &c : wait_callbacks_) {
(*c)();
Expand All @@ -80,14 +83,11 @@ class Req {
// drop the references to anything that was kept alive until wait
wait_drops_.clear();
wait_callbacks_.clear();
}

void wait() {
wait_async();
if (wait_fence_) {
wait_fence_->fence();
if (exec_space_) {
exec_space_->fence();
}
wait_fence_ = nullptr;
exec_space_ = nullptr;
}

// Keep a reference to this view around until wait() is called.
Expand All @@ -111,19 +111,17 @@ class Req {

template <typename ExecSpace>
void fence_at_wait(const ExecSpace &space) {
// TODO: only fence once if the same space is provided multiple times
if (wait_fence_) {
if (exec_space_) {
Kokkos::abort("Req is already fencing a space!");
}

wait_fence_ = std::make_shared<SpaceHolder<ExecSpace>>(space);
exec_space_ = std::make_shared<SpaceHolder<ExecSpace>>(space);
}

private:
MPI_Request req_;
std::vector<std::shared_ptr<ViewHolderBase>> wait_drops_;
std::vector<std::shared_ptr<InvokableHolderBase>> wait_callbacks_;
std::shared_ptr<SpaceHolderBase> wait_fence_;
std::shared_ptr<SpaceHolderBase> exec_space_;
};

} // namespace KokkosComm::Impl

0 comments on commit 3e7e298

Please sign in to comment.