From 6eaab8c1c9f0e2a21531526dfd170ebe3aad507b Mon Sep 17 00:00:00 2001 From: Weiqun Zhang Date: Wed, 3 Aug 2022 13:39:44 -0700 Subject: [PATCH] MPMD Support (#2895) Add support for multiple programs multiple data (MPMD). For now, we assume there are only two programs (i.e., executables) in the MPMD mode. During the initialization, MPI_COMM_WORLD is split into two communicators. The MPMD::Copier class can be used to copy FabArray/MultiFab data between two programs. This new capability can be used by FHDeX to couple FHD with SPPARKS. --- Src/Base/AMReX_BLBackTrace.cpp | 13 +- Src/Base/AMReX_BoxList.H | 4 +- Src/Base/AMReX_MPMD.H | 178 ++++++++++++++++++++++++++ Src/Base/AMReX_MPMD.cpp | 225 +++++++++++++++++++++++++++++++++ Src/Base/CMakeLists.txt | 8 +- Src/Base/Make.package | 4 + 6 files changed, 427 insertions(+), 5 deletions(-) create mode 100644 Src/Base/AMReX_MPMD.H create mode 100644 Src/Base/AMReX_MPMD.cpp diff --git a/Src/Base/AMReX_BLBackTrace.cpp b/Src/Base/AMReX_BLBackTrace.cpp index 477e0b6bac2..0c304d30011 100644 --- a/Src/Base/AMReX_BLBackTrace.cpp +++ b/Src/Base/AMReX_BLBackTrace.cpp @@ -5,6 +5,9 @@ #include #include #include +#ifdef AMREX_USE_MPI +#include +#endif #ifdef AMREX_TINY_PROFILING #include @@ -71,7 +74,15 @@ BLBackTrace::handler(int s) std::string errfilename; { std::ostringstream ss; - ss << "Backtrace." << ParallelDescriptor::MyProc(); +#ifdef AMREX_USE_MPI + if (MPMD::Initialized()) { + ss << "Backtrace.prog" << MPMD::MyProgId() << "."; + } else +#endif + { + ss << "Backtrace."; + } + ss << ParallelDescriptor::MyProc(); #ifdef AMREX_USE_OMP ss << "." << omp_get_thread_num(); #endif diff --git a/Src/Base/AMReX_BoxList.H b/Src/Base/AMReX_BoxList.H index 04e93eab97e..1dc8f15c536 100644 --- a/Src/Base/AMReX_BoxList.H +++ b/Src/Base/AMReX_BoxList.H @@ -206,9 +206,9 @@ public: BoxList& convert (IndexType typ) noexcept; //! Returns a reference to the Vector. - Vector& data() noexcept { return m_lbox; } + Vector& data () noexcept { return m_lbox; } //! Returns a constant reference to the Vector. - const Vector& data() const noexcept { return m_lbox; } + const Vector& data () const noexcept { return m_lbox; } void swap (BoxList& rhs) { std::swap(m_lbox, rhs.m_lbox); diff --git a/Src/Base/AMReX_MPMD.H b/Src/Base/AMReX_MPMD.H new file mode 100644 index 00000000000..2b8ef399866 --- /dev/null +++ b/Src/Base/AMReX_MPMD.H @@ -0,0 +1,178 @@ +#ifndef AMREX_MPMD_H_ +#define AMREX_MPMD_H_ +#include + +#ifdef AMREX_USE_MPI + +#include + +#include + +namespace amrex { namespace MPMD { + +MPI_Comm Initialize (int argc, char* argv[]); + +void Finalize (); + +bool Initialized (); + +int MyProc (); //! Process ID in MPI_COMM_WORLD +int NProcs (); //! Number of processes in MPI_COMM_WORLD +int MyProgId (); //! Program ID + +class Copier +{ +public: + Copier (BoxArray const& ba, DistributionMapping const& dm); + + template + void send (FabArray const& fa, int icomp, int ncomp) const; + + template + void recv (FabArray& fa, int icomp, int ncomp) const; + +private: + std::map m_SndTags; + std::map m_RcvTags; +}; + +template +void Copier::send (FabArray const& mf, int icomp, int ncomp) const +{ + const int N_snds = m_SndTags.size(); + + if (N_snds == 0) return; + + // Prepare buffer + + Vector send_data; + Vector send_size; + Vector send_rank; + Vector send_reqs; + Vector send_cctc; + + Vector offset; + std::size_t total_volume = 0; + for (auto const& kv : m_SndTags) { + auto const& cctc = kv.second; + + std::size_t nbytes = 0; + for (auto const& cct : cctc) { + nbytes += cct.sbox.numPts() * ncomp * sizeof(typename FAB::value_type); + } + + std::size_t acd = ParallelDescriptor::alignof_comm_data(nbytes); + nbytes = amrex::aligned_size(acd, nbytes); // so that bytes are aligned + + // Also need to align the offset properly + total_volume = amrex::aligned_size(std::max(alignof(typename FAB::value_type), + acd), total_volume); + + offset.push_back(total_volume); + total_volume += nbytes; + + send_data.push_back(nullptr); + send_size.push_back(nbytes); + send_rank.push_back(kv.first); + send_reqs.push_back(MPI_REQUEST_NULL); + send_cctc.push_back(&cctc); + } + + Gpu::PinnedVector send_buffer(total_volume); + char* the_send_data = send_buffer.data(); + for (int i = 0; i < N_snds; ++i) { + send_data[i] = the_send_data + offset[i]; + } + + // Pack buffer +#ifdef AMREX_USE_GPU + if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) { + mf.pack_send_buffer_gpu(mf, icomp, ncomp, send_data, send_size, send_cctc); + } else +#endif + { + mf.pack_send_buffer_cpu(mf, icomp, ncomp, send_data, send_size, send_cctc); + } + + // Send + for (int i = 0; i < N_snds; ++i) { + send_reqs[i] = ParallelDescriptor::Asend + (send_data[i], send_size[i], send_rank[i], 100, MPI_COMM_WORLD).req(); + } + Vector stats(N_snds); + ParallelDescriptor::Waitall(send_reqs, stats); +} + +template +void Copier::recv (FabArray& mf, int icomp, int ncomp) const +{ + const int N_rcvs = m_RcvTags.size(); + + if (N_rcvs == 0) return; + + // Prepare buffer + + Vector recv_data; + Vector recv_size; + Vector recv_from; + Vector recv_reqs; + + Vector offset; + std::size_t TotalRcvsVolume = 0; + for (auto const& kv : m_RcvTags) { + std::size_t nbytes = 0; + for (auto const& cct : kv.second) { + nbytes += cct.dbox.numPts() * ncomp * sizeof(typename FAB::value_type); + } + + std::size_t acd = ParallelDescriptor::alignof_comm_data(nbytes); + nbytes = amrex::aligned_size(acd, nbytes); // so that nbytes are aligned + + // Also need to align the offset properly + TotalRcvsVolume = amrex::aligned_size(std::max(alignof(typename FAB::value_type), + acd), TotalRcvsVolume); + + offset.push_back(TotalRcvsVolume); + TotalRcvsVolume += nbytes; + + recv_data.push_back(nullptr); + recv_size.push_back(nbytes); + recv_from.push_back(kv.first); + recv_reqs.push_back(MPI_REQUEST_NULL); + } + + Gpu::PinnedVector recv_buffer(TotalRcvsVolume); + char* the_recv_data = recv_buffer.data(); + + // Recv + for (int i = 0; i < N_rcvs; ++i) { + recv_data[i] = the_recv_data + offset[i]; + recv_reqs[i] = ParallelDescriptor::Arecv + (recv_data[i], recv_size[i], recv_from[i], 100, MPI_COMM_WORLD).req(); + } + + Vector recv_cctc(N_rcvs, nullptr); + for (int i = 0; i < N_rcvs; ++i) { + recv_cctc[i] = &(m_RcvTags.at(recv_from[i])); + } + + Vector stats(N_rcvs); + ParallelDescriptor::Waitall(recv_reqs, stats); + + // Unpack buffer +#ifdef AMREX_USE_GPU + if (Gpu::inLaunchRegion() && (mf.arena()->isDevice() || mf.arena()->isManaged())) { + mf.unpack_recv_buffer_gpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc, + FabArrayBase::COPY, true); + } else +#endif + { + mf.unpack_recv_buffer_cpu(mf, icomp, ncomp, recv_data, recv_size, recv_cctc, + FabArrayBase::COPY, true); + } +} + +}} + +#endif +#endif diff --git a/Src/Base/AMReX_MPMD.cpp b/Src/Base/AMReX_MPMD.cpp new file mode 100644 index 00000000000..917c741c2a6 --- /dev/null +++ b/Src/Base/AMReX_MPMD.cpp @@ -0,0 +1,225 @@ +#include +#include + +#include +#include +#include +#include +#include + +#ifdef AMREX_USE_MPI + +namespace amrex { namespace MPMD { + +namespace { + bool initialized = false; + bool mpi_initialized_by_us = false; + MPI_Comm app_comm = MPI_COMM_NULL; + int myproc; + int nprocs; +} + +namespace { + +template +int num_unique_elements (std::vector& v) +{ + std::sort(v.begin(), v.end()); + auto last = std::unique(v.begin(), v.end()); + return last - v.begin(); +} + +} + +MPI_Comm Initialize (int argc, char* argv[]) +{ + initialized = true; + int flag; + MPI_Initialized(&flag); + if (!flag) { + MPI_Init(&argc, &argv); + mpi_initialized_by_us = true; + } + + MPI_Comm_rank(MPI_COMM_WORLD, &myproc); + MPI_Comm_size(MPI_COMM_WORLD, &nprocs); + + int* p; + MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_APPNUM, &p, &flag); + int appnum = *p; + + std::vector all_appnum(nprocs); + MPI_Allgather(&appnum, 1, MPI_INT, all_appnum.data(), 1, MPI_INT, MPI_COMM_WORLD); + int napps = num_unique_elements(all_appnum); + + // MPI_APPNUM does not appear to work with slurm on some systems. + if (napps != 2) { + std::vector all_argc(nprocs); + MPI_Allgather(&argc, 1, MPI_INT, all_argc.data(), 1, MPI_INT, MPI_COMM_WORLD); + napps = num_unique_elements(all_argc); + if (napps == 2) { + appnum = static_cast(argc != all_argc[0]); + } + } + + if (napps != 2) { + std::string exename; + if (argc > 0) { + exename = std::string(argv[0]); + } + unsigned long long hexe = std::hash{}(exename); + std::vector all_hexe(nprocs); + MPI_Allgather(&hexe, 1, MPI_UNSIGNED_LONG_LONG, + all_hexe.data(), 1, MPI_UNSIGNED_LONG_LONG, MPI_COMM_WORLD); + napps = num_unique_elements(all_hexe); + if (napps == 2) { + appnum = static_cast(hexe != all_hexe[0]); + } + } + + if (napps == 2) { + MPI_Comm_split(MPI_COMM_WORLD, appnum, myproc, &app_comm); + } else { + std::cout << "amrex::MPMD only supports two programs." << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + return app_comm; +} + +void Finalize () +{ + MPI_Comm_free(&app_comm); + if (mpi_initialized_by_us) { + MPI_Finalize(); + mpi_initialized_by_us = false; + } + initialized = false; +} + +bool Initialized () { return initialized; } + +int MyProc () +{ + return myproc; +} + +int NProcs () +{ + return nprocs; +} + +int MyProgId () +{ + return (myproc == ParallelDescriptor::MyProc()) ? 0 : 1; +} + +Copier::Copier (BoxArray const& ba, DistributionMapping const& dm) +{ + int rank_offset = myproc - ParallelDescriptor::MyProc(); + int this_root, other_root; + if (rank_offset == 0) { // First program + this_root = 0; + other_root = ParallelDescriptor::NProcs(); + } else { + this_root = rank_offset; + other_root = 0; + } + + Vector bv = ba.boxList().data(); + + int this_nboxes = ba.size(); + Vector procs = dm.ProcessorMap(); + if (rank_offset != 0) { + for (int i = 0; i < this_nboxes; ++i) { + procs[i] += rank_offset; + } + } + + Vector obv; + Vector oprocs; + int other_nboxes; + if (myproc == this_root) { + if (rank_offset == 0) // the first program + { + MPI_Send(&this_nboxes, 1, MPI_INT, other_root, 0, MPI_COMM_WORLD); + MPI_Recv(&other_nboxes, 1, MPI_INT, other_root, 1, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + obv.resize(other_nboxes); + MPI_Send(bv.data(), this_nboxes, + ParallelDescriptor::Mpi_typemap::type(), + other_root, 2, MPI_COMM_WORLD); + MPI_Recv(obv.data(), other_nboxes, + ParallelDescriptor::Mpi_typemap::type(), + other_root, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + oprocs.resize(other_nboxes); + MPI_Send(procs.data(), this_nboxes, MPI_INT, other_root, 4, MPI_COMM_WORLD); + MPI_Recv(oprocs.data(), other_nboxes, MPI_INT, other_root, 5, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + } + else // the second program + { + MPI_Recv(&other_nboxes, 1, MPI_INT, other_root, 0, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + MPI_Send(&this_nboxes, 1, MPI_INT, other_root, 1, MPI_COMM_WORLD); + obv.resize(other_nboxes); + MPI_Recv(obv.data(), other_nboxes, + ParallelDescriptor::Mpi_typemap::type(), + other_root, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Send(bv.data(), this_nboxes, + ParallelDescriptor::Mpi_typemap::type(), + other_root, 3, MPI_COMM_WORLD); + oprocs.resize(other_nboxes); + MPI_Recv(oprocs.data(), other_nboxes, MPI_INT, other_root, 4, MPI_COMM_WORLD, + MPI_STATUS_IGNORE); + MPI_Send(procs.data(), this_nboxes, MPI_INT, other_root, 5, MPI_COMM_WORLD); + } + } + + ParallelDescriptor::Bcast(&other_nboxes, 1); + if (obv.empty()) { + obv.resize(other_nboxes); + oprocs.resize(other_nboxes); + } + ParallelDescriptor::Bcast(obv.data(), obv.size()); + ParallelDescriptor::Bcast(oprocs.data(), oprocs.size()); + + BoxArray oba(BoxList(std::move(obv))); + + // At this point, ba and bv hold our boxes, and oba holds the other + // program's boxes. procs holds mpi ranks of our boxes, and oprocs holds + // mpi ranks of the other program's boxes. All mpi ranks are in + // MPI_COMM_WORLD. + + // Build communication meta-data + + AMREX_ALWAYS_ASSERT(ba.ixType().cellCentered()); + + std::vector > isects; + + for (int i = 0; i < this_nboxes; ++i) { + if (procs[i] == myproc) { + oba.intersections(bv[i], isects); + for (auto const& isec : isects) { + const int oi = isec.first; + const Box& bx = isec.second; + const int orank = oprocs[oi]; + m_SndTags[orank].push_back + (FabArrayBase::CopyComTag(bx, bx, oi, i)); + m_RcvTags[orank].push_back + (FabArrayBase::CopyComTag(bx, bx, i, oi)); + } + } + } + + for (auto& kv : m_SndTags) { + std::sort(kv.second.begin(), kv.second.end()); + } + for (auto& kv : m_RcvTags) { + std::sort(kv.second.begin(), kv.second.end()); + } +} + +}} + +#endif diff --git a/Src/Base/CMakeLists.txt b/Src/Base/CMakeLists.txt index 6a2db4526cd..c47fdcae706 100644 --- a/Src/Base/CMakeLists.txt +++ b/Src/Base/CMakeLists.txt @@ -71,6 +71,7 @@ target_sources( amrex AMReX_DataAllocator.H AMReX_BLProfiler.H AMReX_BLBackTrace.H + AMReX_BLBackTrace.cpp AMReX_BLFort.H AMReX_NFiles.H AMReX_NFiles.cpp @@ -231,8 +232,6 @@ target_sources( amrex # Memory pool ------------------------------------------------------------- AMReX_MemPool.cpp AMReX_MemPool.H - # Profiling --------------------------------------------------------------- - AMReX_BLBackTrace.cpp # Parser --------------------------------------------------------------- Parser/AMReX_Parser.cpp Parser/AMReX_Parser.H @@ -305,3 +304,8 @@ endif () if (AMReX_TINY_PROFILE) target_sources(amrex PRIVATE AMReX_TinyProfiler.cpp AMReX_TinyProfiler.H ) endif () + +# MPMD +if (AMReX_MPI) + target_sources(amrex PRIVATE AMReX_MPMD.cpp AMReX_MPMD.H ) +endif () diff --git a/Src/Base/Make.package b/Src/Base/Make.package index d7c4e520e7b..79085ae70a1 100644 --- a/Src/Base/Make.package +++ b/Src/Base/Make.package @@ -271,6 +271,10 @@ CEXE_sources += AMReX_Machine.cpp # Forward declaration CEXE_headers += AMReX_BaseFwd.H +ifeq ($(USE_MPI),TRUE) + CEXE_headers += AMReX_MPMD.H + CEXE_sources += AMReX_MPMD.cpp +endif VPATH_LOCATIONS += $(AMREX_HOME)/Src/Base INCLUDE_LOCATIONS += $(AMREX_HOME)/Src/Base