Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPMD Support #2895

Merged
merged 1 commit into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
MPMD Support
Add support for multiple programs multiple data (MPMD).  For now, we assume
there are only two programs (i.e., executables) in the MPMD mode.  During
the initialization, MPI_COMM_WORLD is split into two communicators.  The
MPMD::Copier class can be used to copy FabArray/MultiFab data between two
programs.  This new capability can be used by FHDeX to couple FHD with
SPPARKS.
  • Loading branch information
WeiqunZhang committed Aug 2, 2022
commit 4b0415cd6a9b574187d607289b519a8e55357512
13 changes: 12 additions & 1 deletion Src/Base/AMReX_BLBackTrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#include <AMReX_AsyncOut.H>
#include <AMReX.H>
#include <AMReX_Utility.H>
#ifdef AMREX_USE_MPI
#include <AMReX_MPMD.H>
#endif

#ifdef AMREX_TINY_PROFILING
#include <AMReX_TinyProfiler.H>
Expand Down Expand Up @@ -71,7 +74,15 @@ BLBackTrace::handler(int s)
std::string errfilename;
{
std::ostringstream ss;
ss << "Backtrace." << ParallelDescriptor::MyProc();
#ifdef AMREX_USE_MPI
if (MPMD::Initialized()) {
ss << "Backtrace.prog" << MPMD::MyProgId() << ".";
} else
#endif
{
ss << "Backtrace.";
}
ss << ParallelDescriptor::MyProc();
#ifdef AMREX_USE_OMP
ss << "." << omp_get_thread_num();
#endif
Expand Down
4 changes: 2 additions & 2 deletions Src/Base/AMReX_BoxList.H
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ public:
BoxList& convert (IndexType typ) noexcept;

//! Returns a reference to the Vector<Box>.
Vector<Box>& data() noexcept { return m_lbox; }
Vector<Box>& data () noexcept { return m_lbox; }
//! Returns a constant reference to the Vector<Box>.
const Vector<Box>& data() const noexcept { return m_lbox; }
const Vector<Box>& data () const noexcept { return m_lbox; }

void swap (BoxList& rhs) {
std::swap(m_lbox, rhs.m_lbox);
Expand Down
178 changes: 178 additions & 0 deletions Src/Base/AMReX_MPMD.H
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#ifndef AMREX_MPMD_H_
#define AMREX_MPMD_H_
#include <AMReX_Config.H>

#ifdef AMREX_USE_MPI

#include <AMReX_FabArray.H>

#include <mpi.h>

namespace amrex { namespace MPMD {

MPI_Comm Initialize (int argc, char* argv[]);

void Finalize ();

bool Initialized ();

int MyProc (); //! Process ID in MPI_COMM_WORLD
int NProcs (); //! Number of processes in MPI_COMM_WORLD
int MyProgId (); //! Program ID

class Copier
{
public:
Copier (BoxArray const& ba, DistributionMapping const& dm);

template <typename FAB>
void send (FabArray<FAB> const& fa, int icomp, int ncomp) const;

template <typename FAB>
void recv (FabArray<FAB>& fa, int icomp, int ncomp) const;

private:
std::map<int,FabArrayBase::CopyComTagsContainer> m_SndTags;
std::map<int,FabArrayBase::CopyComTagsContainer> m_RcvTags;
};

template <typename FAB>
void Copier::send (FabArray<FAB> const& mf, int icomp, int ncomp) const
{
const int N_snds = m_SndTags.size();

if (N_snds == 0) return;

// Prepare buffer

Vector<char*> send_data;
Vector<std::size_t> send_size;
Vector<int> send_rank;
Vector<MPI_Request> send_reqs;
Vector<FabArrayBase::CopyComTagsContainer const*> send_cctc;

Vector<std::size_t> offset;
std::size_t total_volume = 0;
for (auto const& kv : m_SndTags) {
auto const& cctc = kv.second;

std::size_t nbytes = 0;
for (auto const& cct : cctc) {
nbytes += cct.sbox.numPts() * ncomp * sizeof(typename FAB::value_type);
}

std::size_t acd = ParallelDescriptor::alignof_comm_data(nbytes);
nbytes = amrex::aligned_size(acd, nbytes); // so that bytes are aligned

// Also need to align the offset properly
total_volume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
acd), total_volume);

offset.push_back(total_volume);
total_volume += nbytes;

send_data.push_back(nullptr);
send_size.push_back(nbytes);
send_rank.push_back(kv.first);
send_reqs.push_back(MPI_REQUEST_NULL);
send_cctc.push_back(&cctc);
}

Gpu::PinnedVector<char> send_buffer(total_volume);
char* the_send_data = send_buffer.data();
for (int i = 0; i < N_snds; ++i) {
send_data[i] = the_send_data + offset[i];
}

// Pack buffer
#ifdef AMREX_USE_GPU
if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
mf.pack_send_buffer_gpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
} else
#endif
{
mf.pack_send_buffer_cpu(mf, icomp, ncomp, send_data, send_size, send_cctc);
}

// Send
for (int i = 0; i < N_snds; ++i) {
send_reqs[i] = ParallelDescriptor::Asend
(send_data[i], send_size[i], send_rank[i], 100, MPI_COMM_WORLD).req();
}
Vector<MPI_Status> stats(N_snds);
ParallelDescriptor::Waitall(send_reqs, stats);
}

template <typename FAB>
void Copier::recv (FabArray<FAB>& mf, int icomp, int ncomp) const
{
const int N_rcvs = m_RcvTags.size();

if (N_rcvs == 0) return;

// Prepare buffer

Vector<char*> recv_data;
Vector<std::size_t> recv_size;
Vector<int> recv_from;
Vector<MPI_Request> recv_reqs;

Vector<std::size_t> offset;
std::size_t TotalRcvsVolume = 0;
for (auto const& kv : m_RcvTags) {
std::size_t nbytes = 0;
for (auto const& cct : kv.second) {
nbytes += cct.dbox.numPts() * ncomp * sizeof(typename FAB::value_type);
}

std::size_t acd = ParallelDescriptor::alignof_comm_data(nbytes);
nbytes = amrex::aligned_size(acd, nbytes); // so that nbytes are aligned

// Also need to align the offset properly
TotalRcvsVolume = amrex::aligned_size(std::max(alignof(typename FAB::value_type),
acd), TotalRcvsVolume);

offset.push_back(TotalRcvsVolume);
TotalRcvsVolume += nbytes;

recv_data.push_back(nullptr);
recv_size.push_back(nbytes);
recv_from.push_back(kv.first);
recv_reqs.push_back(MPI_REQUEST_NULL);
}

Gpu::PinnedVector<char> recv_buffer(TotalRcvsVolume);
char* the_recv_data = recv_buffer.data();

// Recv
for (int i = 0; i < N_rcvs; ++i) {
recv_data[i] = the_recv_data + offset[i];
recv_reqs[i] = ParallelDescriptor::Arecv
(recv_data[i], recv_size[i], recv_from[i], 100, MPI_COMM_WORLD).req();
}

Vector<FabArrayBase::CopyComTagsContainer const*> recv_cctc(N_rcvs, nullptr);
for (int i = 0; i < N_rcvs; ++i) {
recv_cctc[i] = &(m_RcvTags.at(recv_from[i]));
}

Vector<MPI_Status> stats(N_rcvs);
ParallelDescriptor::Waitall(recv_reqs, stats);

// Unpack buffer
#ifdef AMREX_USE_GPU
if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) {
mf.unpack_recv_buffer_gpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
FabArrayBase::COPY, true);
} else
#endif
{
mf.unpack_recv_buffer_cpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc,
FabArrayBase::COPY, true);
}
}

}}

#endif
#endif
Loading