Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft kokkos-comm initialization/finalization using MPI_Sessions #68

Draft
wants to merge 20 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5614211
feat: add `Universe` class to handle KokkosComm initialization/finali…
dssgabriel May 15, 2024
45c695f
feat: add support for buffered mode send operations
dssgabriel May 15, 2024
ea667db
Merge branch 'kokkos:develop' into initialize-finalize
dssgabriel May 16, 2024
bc31289
fix: correctly detach buffer and fix size check
dssgabriel May 16, 2024
4852d9c
feat: first draft of `Communicator` class
dssgabriel May 16, 2024
e37173b
feat(session): implement a first draft of session-based init
dssgabriel May 16, 2024
aac626e
Merge branch 'kokkos:develop' into initialize-finalize
dssgabriel May 17, 2024
4bc55e2
feat: initialization and finalization routines for KokkosComm
dssgabriel May 18, 2024
afbb017
test: `send_recv` with KokkosComm init and finalize
dssgabriel May 18, 2024
89e3ee3
Merge branch 'kokkos:develop' into initialize-finalize
dssgabriel May 20, 2024
dcc4584
feat(sessions): move init & finalize w/ `MPI_Session` to its own file
dssgabriel May 23, 2024
de176b6
revert: removed Buffered communication mode
dssgabriel May 23, 2024
bb58138
feat(sessions): removed dead code in `Communicator` class
dssgabriel May 23, 2024
c2812bd
test(sessions): update tests with new `init`/`finalize` API
dssgabriel May 23, 2024
4ceee5f
chore: format
dssgabriel May 23, 2024
40ba7e7
fix(sessions): remove code related to MPI's WPM
dssgabriel May 23, 2024
822722d
refactor(sessions): rename as `Context`, remove `initialize` w/ WPM &…
dssgabriel May 23, 2024
8faf205
test(sessions): update test accordingly
dssgabriel May 23, 2024
d064469
chore: format
dssgabriel May 24, 2024
f0d5285
fix(sessions): only free communicator if it isn't already null
dssgabriel May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions perf_tests/test_2dhalo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

#include "KokkosComm.hpp"

#include <Kokkos_Core_fwd.hpp>
#include <iostream>

void noop(benchmark::State, MPI_Comm) {}
void noop(benchmark::State, const KokkosComm::Communicator<Kokkos::DefaultExecutionSpace> &comm) {}

template <typename Space, typename View>
void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int nx, int ny, int rx, int ry, int rs,
const View &v) {
void send_recv(benchmark::State &, const KokkosComm::Communicator<Kokkos::DefaultExecutionSpace> &comm,
const Space &space, int nx, int ny, int rx, int ry, int rs, const View &v) {
// 2D index of nbrs in minus and plus direction (periodic)
const int xm1 = (rx + rs - 1) % rs;
const int ym1 = (ry + rs - 1) % rs;
Expand Down Expand Up @@ -73,10 +74,11 @@ void benchmark_2dhalo(benchmark::State &state) {
int ny = 512;
int nprops = 3;

int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
auto universe = KokkosComm::initialize<Kokkos::DefaultExecutionSpace>();
auto comm = universe.comm();

int rank = comm.rank();
int size = comm.size();
const int rs = std::sqrt(size);
const int rx = rank % rs;
const int ry = rank / rs;
Expand All @@ -86,12 +88,11 @@ void benchmark_2dhalo(benchmark::State &state) {
// grid of elements, each with 3 properties, and a radius-1 halo
grid_type grid("", nx + 2, ny + 2, nprops);
while (state.KeepRunning()) {
do_iteration(state, MPI_COMM_WORLD, send_recv<Kokkos::DefaultExecutionSpace, grid_type>, space, nx, ny, rx, ry,
rs, grid);
do_iteration(state, comm, send_recv<Kokkos::DefaultExecutionSpace, grid_type>, space, nx, ny, rx, ry, rs, grid);
}
} else {
while (state.KeepRunning()) {
do_iteration(state, MPI_COMM_WORLD, noop); // do nothing...
do_iteration(state, comm, noop); // do nothing...
}
}

Expand All @@ -113,4 +114,4 @@ void benchmark_2dhalo(benchmark::State &state) {
// clang-format on
}

BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond);
BENCHMARK(benchmark_2dhalo)->UseManualTime()->Unit(benchmark::kMillisecond);
15 changes: 9 additions & 6 deletions perf_tests/test_sendrecv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#include "KokkosComm.hpp"

template <typename Space, typename View>
void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank, const View &v) {
void send_recv(benchmark::State &, const KokkosComm::Communicator<Space> &comm, const Space &space, int rank,
const View &v) {
if (0 == rank) {
KokkosComm::send(space, v, 1, 0, comm);
KokkosComm::recv(space, v, 1, 0, comm);
Expand All @@ -30,9 +31,11 @@ void send_recv(benchmark::State &, MPI_Comm comm, const Space &space, int rank,
}

void benchmark_sendrecv(benchmark::State &state) {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
auto universe = KokkosComm::initialize<Kokkos::DefaultExecutionSpace>();
auto comm = universe.comm();

int rank = comm.rank();
int size = comm.size();
if (size < 2) {
state.SkipWithError("benchmark_sendrecv needs at least 2 ranks");
}
Expand All @@ -44,10 +47,10 @@ void benchmark_sendrecv(benchmark::State &state) {
view_type a("", 1000000);

while (state.KeepRunning()) {
do_iteration(state, MPI_COMM_WORLD, send_recv<Kokkos::DefaultExecutionSpace, view_type>, space, rank, a);
do_iteration(state, comm, send_recv<Kokkos::DefaultExecutionSpace, view_type>, space, rank, a);
}

state.SetBytesProcessed(sizeof(Scalar) * state.iterations() * a.size() * 2);
}

BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond);
BENCHMARK(benchmark_sendrecv)->UseManualTime()->Unit(benchmark::kMillisecond);
9 changes: 6 additions & 3 deletions perf_tests/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

#pragma once

#include <Kokkos_Core_fwd.hpp>
#include <chrono>

#include <benchmark/benchmark.h>

#include "KokkosComm_communicator.hpp"
#include "KokkosComm_include_mpi.hpp"

// F is a function that takes (state, MPI_Comm, args...)
template <typename F, typename... Args>
void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args) {
void do_iteration(benchmark::State &state, KokkosComm::Communicator<Kokkos::DefaultExecutionSpace> &comm, F &&func,
Args... args) {
using Clock = std::chrono::steady_clock;
using Duration = std::chrono::duration<double>;

Expand All @@ -34,6 +37,6 @@ void do_iteration(benchmark::State &state, MPI_Comm comm, F &&func, Args... args

double max_elapsed_second;
double elapsed_seconds = elapsed.count();
MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm);
MPI_Allreduce(&elapsed_seconds, &max_elapsed_second, 1, MPI_DOUBLE, MPI_MAX, comm.as_raw());
state.SetIterationTime(max_elapsed_second);
}
}
15 changes: 9 additions & 6 deletions src/KokkosComm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,34 @@

#pragma once

#include <mpi.h>
#include "KokkosComm_collective.hpp"
#include "KokkosComm_communicator.hpp"
#include "KokkosComm_version.hpp"
#include "KokkosComm_isend.hpp"
#include "KokkosComm_recv.hpp"
#include "KokkosComm_send.hpp"
#include "KokkosComm_concepts.hpp"
#include "KokkosComm_comm_mode.hpp"
#include "KokkosComm_MPI_instance.hpp"

#include <Kokkos_Core.hpp>

namespace KokkosComm {

template <CommMode SendMode = CommMode::Default, KokkosExecutionSpace ExecSpace, KokkosView SendView>
Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) {
return Impl::isend<SendMode>(space, sv, dest, tag, comm);
Req isend(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator<ExecSpace> &comm) {
return Impl::isend<SendMode>(space, sv, dest, tag, comm.as_raw());
}

template <CommMode SendMode = CommMode::Default, KokkosExecutionSpace ExecSpace, KokkosView SendView>
void send(const ExecSpace &space, const SendView &sv, int dest, int tag, MPI_Comm comm) {
return Impl::send<SendMode>(space, sv, dest, tag, comm);
void send(const ExecSpace &space, const SendView &sv, int dest, int tag, const Communicator<ExecSpace> &comm) {
return Impl::send<SendMode>(space, sv, dest, tag, comm.as_raw());
}

template <KokkosExecutionSpace ExecSpace, KokkosView RecvView>
void recv(const ExecSpace &space, RecvView &sv, int src, int tag, MPI_Comm comm) {
return Impl::recv(space, sv, src, tag, comm);
void recv(const ExecSpace &space, RecvView &sv, int src, int tag, const Communicator<ExecSpace> &comm) {
return Impl::recv(space, sv, src, tag, comm.as_raw());
}

} // namespace KokkosComm
22 changes: 9 additions & 13 deletions src/KokkosComm_comm_mode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@ namespace KokkosComm {
// Scoped enumeration to specify the communication mode of a sending operation.
// See section 3.4 of the MPI standard for a complete specification.
enum class CommMode {
// Default mode: lets the user override the send operations behavior at
// compile-time. E.g., this can be set to mode "Synchronous" for debug
// builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE.
// Default mode: lets the user override the send operations behavior at compile-time. E.g. this can be set to mode
// "Synchronous" for debug builds by defining KOKKOSCOMM_FORCE_SYNCHRONOUS_MODE.
Default,
// Standard mode: MPI implementation decides whether outgoing messages will
// be buffered. Send operations can be started whether or not a matching
// receive has been started. They may complete before a matching receive is
// started. Standard mode is non-local: successful completion of the send
// operation may depend on the occurrence of a matching receive.
// Standard mode: MPI implementation decides whether outgoing messages will be buffered. Send operations can be
// started whether or not a matching receive has been started. They may complete before a matching receive is started.
// Standard mode is non-local: successful completion of the send operation may depend on the occurrence of a matching
// receive.
Standard,
// Ready mode: Send operations may be started only if the matching receive is
// already started.
// Ready mode: Send operations may be started only if the matching receive is already started.
Ready,
// Synchronous mode: Send operations complete successfully only if a matching
// receive is started, and the receive operation has started to receive the
// message sent.
// Synchronous mode: Send operations complete successfully only if a matching receive is started, and the receive
// operation has started to receive the message sent.
Synchronous,
};

Expand Down
116 changes: 116 additions & 0 deletions src/KokkosComm_communicator.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//@HEADER
// ************************************************************************
//
// Kokkos v. 4.0
// Copyright (2022) National Technology & Engineering
// Solutions of Sandia, LLC (NTESS).
//
// Under the terms of Contract DE-NA0003525 with NTESS,
// the U.S. Government retains certain rights in this software.
//
// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
// See https://kokkos.org/LICENSE for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//@HEADER

#pragma once

#include "KokkosComm_concepts.hpp"

#include <cstdio>
#include <mpi.h>

namespace KokkosComm {

using Color = int;
using Key = int;

template <KokkosExecutionSpace ExecSpace>
class Communicator {
public:
~Communicator() {
switch (_comm_kind) {
// FIXME: find out how to properly free a session-associated communicator
// case CommunicatorKind::User: MPI_Comm_free(&_comm); break;
default: break;
}
}

static auto from_raw(MPI_Comm raw) -> Communicator {
assert(MPI_COMM_NULL != raw);

CommunicatorKind comm_kind;
if (MPI_COMM_SELF == raw) {
comm_kind = CommunicatorKind::Self;
} else if (MPI_COMM_WORLD == raw) {
comm_kind = CommunicatorKind::World;
} else {
int flag;
MPI_Comm_test_inter(raw, &flag);
if (0 == flag) {
comm_kind = CommunicatorKind::User;
} else {
fprintf(stderr, "[KokkosComm] error: intercommunicators are not supported (yet).\n");
std::terminate();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't just terminate. Either throw an exception or call MPI_Abort (I've seen applications hang where one process aborted without calling MPI_Abort, it's nasty)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific error code you suggest?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, anything non-zero would work. This ties in with #29 so anything you do here is probably temporary anyway :)

}
}

return Communicator(raw, comm_kind);
}

inline static auto from_raw_unchecked(MPI_Comm comm) -> Communicator {
return Communicator(comm, CommunicatorKind::User);
}

static auto dup_raw(MPI_Comm raw) -> Communicator {
MPI_Comm new_comm;
MPI_Comm_dup(raw, &new_comm);
return Communicator(new_comm, CommunicatorKind::User);
}

static auto dup(const Communicator &other) -> Communicator { return Communicator::dup_raw(other.as_raw()); }

static auto split_raw(MPI_Comm raw, Color color, Key key) -> Communicator {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this isn't just an overload of split?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make it explicit that we're splitting from a raw MPI_Comm handle, not a wrapped KokkosComm::Communicator, but I guess we can also simply overload.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's a design decision that the Kokkos community should make (to be consistent with the other projects)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the Raw. We typically are happy with overloads.

MPI_Comm new_comm;
MPI_Comm_split(raw, color, key, &new_comm);
return Communicator(new_comm, CommunicatorKind::User);
}

static auto split(const Communicator &other, Color color, Key key) -> Communicator {
return Communicator::split_raw(other.as_raw(), color, key);
}

inline auto as_raw() const -> MPI_Comm { return _comm; }

inline static auto self(void) -> Communicator { return Communicator::from_raw_unchecked(MPI_COMM_SELF); }

inline static auto world(void) -> Communicator { return Communicator::from_raw_unchecked(MPI_COMM_WORLD); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't belong into the communicator. Since we use sessions, there should be no use of the world process model. It could even be that no one has called MPI_Init.


inline auto size(void) const -> int {
int size;
MPI_Comm_size(_comm, &size);
return size;
}

inline auto rank(void) const -> int {
int rank;
MPI_Comm_rank(_comm, &rank);
return rank;
}

private:
enum class CommunicatorKind {
Self, // MPI_COMM_SELF
World, // MPI_COMM_WORLD
User, // User-defined communicator
};

Communicator(MPI_Comm comm, CommunicatorKind comm_kind) : _comm(comm), _comm_kind(comm_kind) {}

MPI_Comm _comm;
CommunicatorKind _comm_kind;
ExecSpace _exec_space;
};

} // namespace KokkosComm
Loading
Loading