Skip to content

Commit

Permalink
Merge pull request #35 from PROBIC/remove-remaining-mpi-code
Browse files Browse the repository at this point in the history
Remove remaining mpi code
  • Loading branch information
tmaklin authored Sep 11, 2024
2 parents 1216451 + a05d31a commit 34eb9e9
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 132 deletions.
24 changes: 1 addition & 23 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,6 @@ endif()
set (CMAKE_CXX_STANDARD 17)
set (CMAKE_CXX_STANDARD_REQUIRED ON)

### MPI
if (CMAKE_ENABLE_MPI_SUPPORT)
find_package(MPI REQUIRED)
set(MSWEEP_MPI_SUPPORT 1)
include_directories(MPI_C_INCLUDE_DIRS)
if (CMAKE_MPI_MAX_PROCESSES)
set(MSWEEP_MPI_MAX_PROCESSES ${CMAKE_MPI_MAX_PROCESSES})
else()
set(MSWEEP_MPI_MAX_PROCESSES 1024)
endif()
else()
set(MSWEEP_MPI_SUPPORT 0)
endif()

set(LIBRARY_OUTPUT_PATH ${CMAKE_CURRENT_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/bin)

Expand All @@ -103,8 +89,6 @@ string(TIMESTAMP _BUILD_TIMESTAMP)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/mSWEEP_version.h.in ${CMAKE_CURRENT_BINARY_DIR}/include/mSWEEP_version.h @ONLY)
## Configure OpenMP if it supported on the system.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/mSWEEP_openmp_config.hpp.in ${CMAKE_CURRENT_BINARY_DIR}/include/mSWEEP_openmp_config.hpp @ONLY)
## Configure MPI if it's supported on the system.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/include/mSWEEP_mpi_config.hpp.in ${CMAKE_CURRENT_BINARY_DIR}/include/mSWEEP_mpi_config.hpp @ONLY)

add_executable(mSWEEP ${CMAKE_CURRENT_SOURCE_DIR}/src/mSWEEP.cpp)

Expand Down Expand Up @@ -300,8 +284,7 @@ else()
SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/external/rcgpar"
BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/external/rcgpar"
BUILD_IN_SOURCE 0
CMAKE_ARGS -D CMAKE_ENABLE_MPI_SUPPORT=${MSWEEP_MPI_SUPPORT}
-D CMAKE_SEAMAT_HEADERS=${CMAKE_SEAMAT_HEADERS}
CMAKE_ARGS -D CMAKE_SEAMAT_HEADERS=${CMAKE_SEAMAT_HEADERS}
-D CMAKE_BITMAGIC_HEADERS=${CMAKE_BITMAGIC_HEADERS}
-D CMAKE_LIBTORCH_PATH=${CMAKE_LIBTORCH_PATH}
-D CMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
Expand Down Expand Up @@ -368,11 +351,6 @@ include_directories(
${CMAKE_CURRENT_BINARY_DIR}/include
)

if(MPI_FOUND)
add_dependencies(mSWEEP rcgmpi bigmpi)
target_link_libraries(mSWEEP rcgmpi ${LIBRARY_OUTPUT_PATH}/libbigmpi.a)
endif()

add_library(libmsweep
${CMAKE_CURRENT_SOURCE_DIR}/src/Sample.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/PlainSample.cpp
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Estimation options:
--no-fit-model Do not estimate the abundances. Useful if only the likelihood matrix is required (default: false).
--max-iters Maximum number of iterations to run the abundance estimation optimizer for (default: 5000).
--tol Optimization terminates when the bound changes by less than the given tolerance (default: 0.000001).
--algorithm Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcggpu).
--algorithm Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcgcpu).
--emprecision Precision to use for the emgpu algorithm (one of float, double; default: double).
Bootstrapping options:
Expand Down
8 changes: 1 addition & 7 deletions include/mSWEEP_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <string>

#include "cxxio.hpp"
#include "mSWEEP_mpi_config.hpp"

namespace mSWEEP {
class Log : public cxxio::Out {
Expand Down Expand Up @@ -43,12 +42,7 @@ class Log : public cxxio::Out {

template <typename T>
Log& operator<<(Log &os, T t) {
int rank = 0;
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
// Only root will log
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
#endif
if (os.verbose && rank == 0) {
if (os.verbose) {
if (os.log_time){
std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
std::time_t now_t = std::chrono::system_clock::to_time_t(now);
Expand Down
36 changes: 0 additions & 36 deletions include/mSWEEP_mpi_config.hpp.in

This file was deleted.

77 changes: 12 additions & 65 deletions src/mSWEEP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

#include "mSWEEP_alignment.hpp"

#include "mSWEEP_mpi_config.hpp"
#include "mSWEEP_openmp_config.hpp"
#include "mSWEEP_version.h"

Expand Down Expand Up @@ -125,7 +124,7 @@ void parse_args(int argc, char* argv[], cxxargs::Arguments &args) {
// Tolerance for abundance estimation convergence
args.add_long_argument<double>("tol", "Optimization terminates when the bound changes by less than the given tolerance (default: 0.000001).", (double)0.000001);
// Algorithm to use for abundance estimation
args.add_long_argument<std::string>("algorithm", "Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcggpu).", "rcggpu");
args.add_long_argument<std::string>("algorithm", "Which algorithm to use for abundance estimation (one of rcggpu, emgpu, rcgcpu (original mSWEEP); default: rcgcpu).", "rcgcpu");
// Precision for abundance estimation with emgpu algorithm
args.add_long_argument<std::string>("emprecision", "Precision to use for the emgpu algorithm (one of float, double; default: double).\n\nBootstrapping options:", "double");

Expand Down Expand Up @@ -162,30 +161,20 @@ void parse_args(int argc, char* argv[], cxxargs::Arguments &args) {

void finalize(const std::string &msg, mSWEEP::Log &log, bool abort = false) {
// Set the state of the program so that it can finish correctly:
// - Finalizes (or aborts) any/all MPI processes.
// - Writes a potential message `msg` to the log.
// - Flushes the log (ensure all messages are displayed).
//
// Input:
// `msg`: message to print.
// `log`: logger (see msweep_log.hpp).
// `abort`: terminate MPI (from any process).
//
if (abort != 0) {
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
MPI_Abort(MPI_COMM_WORLD, 1);
#endif
}
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
MPI_Finalize();
#endif
if (!msg.empty())
std::cerr << msg;
log.flush();
}

seamat::DenseMatrix<double> rcg_optl(const cxxargs::Arguments &args, const seamat::Matrix<double> &ll_mat, const std::vector<double> &log_ec_counts, const std::vector<double> &prior_counts, mSWEEP::Log &log) {
// Wrapper for calling rcgpar with omp or mpi depending on config.
// Wrapper for calling rcgpar with omp or gpu depending on config.
//
// Input:
// `args`: commandl line arguments.
Expand All @@ -198,49 +187,29 @@ seamat::DenseMatrix<double> rcg_optl(const cxxargs::Arguments &args, const seama
// `ec_probs`: read-lineage probability matrix. Use rcgpar::mixture_components to
// transform this into the relative abundances vector.
//
std::ofstream of; // Silence output from ranks > 1 with an empty ofstream
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
// MPI parallelization (hybrid with OpenMP if enabled).
int rank;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
const seamat::DenseMatrix<double> &ec_probs = rcgpar::rcg_optl_mpi(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (rank == 0 && args.value<bool>("verbose") ? log.stream() : of));
return ec_probs;

#else
// Only OpenMP parallelization (if enabled).
std::ofstream of;

if (args.value<std::string>("algorithm") == "rcggpu") {
// Run rcg on CPU or GPU if present
const seamat::DenseMatrix<double> &ec_probs = rcgpar::rcg_optl_torch(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (args.value<bool>("verbose") ? log.stream() : of));
return ec_probs;
} else if (args.value<std::string>("algorithm") == "rcgcpu") {
// Run rcg on CPU, use OpenMP if supported
const seamat::DenseMatrix<double> &ec_probs = rcgpar::rcg_optl_omp(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (args.value<bool>("verbose") ? log.stream() : of));
return ec_probs;
} else {
// Run em on CPU or GPU if present
const seamat::DenseMatrix<double> &ec_probs = rcgpar::em_torch(ll_mat, log_ec_counts, prior_counts, args.value<double>("tol"), args.value<size_t>("max-iters"), (args.value<bool>("verbose") ? log.stream() : of), args.value<std::string>("emprecision"));
return ec_probs;
}
#endif
}

int main (int argc, char *argv[]) {
// mSWEEP executable main

int rank = 0; // If MPI is not supported then we are always on the root process
mSWEEP::Log log(std::cerr, CmdOptionPresent(argv, argv+argc, "--verbose"), false); // logger class from msweep_log.hpp

// Initialize MPI if enabled
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
int rc = MPI_Init(&argc, &argv);
if (rc != MPI_SUCCESS) {
finalize("MPI initialization failed\n\n", log);
return 1;
}
int ntasks;
rc=MPI_Comm_size(MPI_COMM_WORLD,&ntasks);
rc=MPI_Comm_rank(MPI_COMM_WORLD,&rank);
#endif

// Use the cxxargs library to parse the command line arguments.
// Note: if MPI is enabled the arguments are currently parsed to all processes.
cxxargs::Arguments args("mSWEEP", "Usage: mSWEEP --themisto-1 <forwardPseudoalignments> --themisto-2 <reversePseudoalignments> -i <groupIndicatorsFile>");

// Print version when running if `--verbose` is used.
Expand Down Expand Up @@ -290,42 +259,28 @@ int main (int argc, char *argv[]) {
std::unique_ptr<mSWEEP::Reference> reference;
log << "Reading the input files" << '\n';
try {
if (rank == 0) { // Only root reads in data
log << " reading group indicators" << '\n';
cxxio::In indicators(args.value<std::string>('i'));
reference = std::move(mSWEEP::ConstructAdaptiveReference(&indicators.stream(), '\t'));
if (reference->get_n_groupings() > 1) {
log << " read " << reference->get_n_groupings() << " groupings" << '\n';
}
log << " read " << reference->get_n_refs() << " group indicators" << '\n';
}
} catch (std::exception &e) {
finalize("Reading group indicators failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
return 1;
}

// Estimate abundances with all groupings that were provided
uint16_t n_groupings;
if (rank == 0) { // rank 0
n_groupings = reference->get_n_groupings();
}
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
// Only root process has read in the input.
MPI_Bcast(&n_groupings, 1, MPI_UINT16_T, 0, MPI_COMM_WORLD);
#endif
n_groupings = reference->get_n_groupings();

// Wrapper class for ensuring the outfile names are set consistently and correctly
mSWEEP::OutfileDesignator out(args.value<std::string>('o'), n_groupings, args.value<std::string>("compress"), args.value<int>("compression-level"));

// Estimate abundances with all groupings
for (uint16_t i = 0; i < n_groupings; ++i) {
// Send the number of groups in this grouping from root to all processes
uint16_t n_groups;
if (rank == 0) // rank 0
n_groups = reference->n_groups(i);
#if defined(MSWEEP_MPI_SUPPORT) && (MSWEEP_MPI_SUPPORT) == 1
MPI_Bcast(&n_groups, 1, MPI_UINT16_T, 0, MPI_COMM_WORLD);
#endif
size_t n_groups = reference->n_groups(i);

// `Sample` and its children are classes for storing data from
// the pseudoalignment that are needed or not needed depending on
Expand All @@ -339,9 +294,7 @@ int main (int argc, char *argv[]) {
std::unique_ptr<mSWEEP::Likelihood<double>> log_likelihoods;

// Check if reading likelihood from file.
// In MPI configurations, only root needs to read in the data. Distributing the values
// is handled by the rcgpar::rcg_optl_mpi implementation.
if (rank == 0 && !CmdOptionPresent(argv, argv+argc, "--read-likelihood")) {
if (!CmdOptionPresent(argv, argv+argc, "--read-likelihood")) {
// Start from the pseudoalignments.
// To save memory, the alignment can go out of scope.
// The necessary values are stored in the Sample class.
Expand Down Expand Up @@ -397,7 +350,6 @@ int main (int argc, char *argv[]) {
}

// Initialize Sample depending on how the alignment needs to be processed.
// Note: this is also only used by the root process in MPI configuration.
mSWEEP::ConstructSample(alignment, args.value<size_t>("iters"), args.value<size_t>("bootstrap-count"), args.value<size_t>("seed"), bin_reads, sample);

log.flush();
Expand All @@ -419,7 +371,7 @@ int main (int argc, char *argv[]) {

try {
// Write the likelihood to disk here if it was requested.
if (rank == 0 && (args.value<bool>("write-likelihood") || args.value<bool>("write-likelihood-bitseq")))
if (args.value<bool>("write-likelihood") || args.value<bool>("write-likelihood-bitseq"))
log_likelihoods->write((args.value<bool>("write-likelihood-bitseq") ? "bitseq" : "mSWEEP"), out.likelihoods((args.value<bool>("write-likelihood-bitseq") ? "bitseq" : "mSWEEP")));
} catch (std::exception &e) {
finalize("Writing the likelihood to file failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
Expand Down Expand Up @@ -462,7 +414,6 @@ int main (int argc, char *argv[]) {
}

// Run binning if requested and write results to files.
if (rank == 0) { // root performs the rest.
// Turn the probs into relative abundances
if (args.value<std::string>("algorithm") == "rcgcpu") {
sample->store_abundances(rcgpar::mixture_components(sample->get_probs(), log_likelihoods->log_counts()));
Expand Down Expand Up @@ -539,7 +490,6 @@ int main (int argc, char *argv[]) {
finalize("Writing the probabilities failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
return 1;
}
}

// Bootstrap the ec_counts and estimate from the bootstrapped data if required
if (bootstrap_mode) {
Expand All @@ -548,7 +498,6 @@ int main (int argc, char *argv[]) {
// Bootstrap the counts
log << "Bootstrap" << " iter " << k + 1 << "/" << args.value<size_t>("iters") << '\n';
std::vector<double> resampled_counts;
if (rank == 0)
resampled_counts = std::move(static_cast<mSWEEP::BootstrapSample*>(&(*sample))->resample_counts());

// Estimate with the bootstrapped counts
Expand All @@ -559,19 +508,17 @@ int main (int argc, char *argv[]) {
finalize("Bootstrap iteration " + std::to_string(k) + "/" + std::to_string(args.value<size_t>("iters")) + " failed:\n " + std::string(e.what()) + "\nexiting\n", log, true);
return 1;
}
if (rank == 0) {
if (args.value<std::string>("algorithm") == "rcgcpu") {
sample->store_abundances(rcgpar::mixture_components(sample->get_probs(), resampled_counts));
} else {
sample->store_abundances(rcgpar::mixture_components_torch(sample->get_probs(), resampled_counts));
}
}
}
}
}

// Write relative abundances
if (rank == 0 && !args.value<bool>("no-fit-model")) {
if (!args.value<bool>("no-fit-model")) {
try {
if (sample->get_rate_run()) {
const std::vector<double> &log_kld = sample->get_log_klds();
Expand Down

0 comments on commit 34eb9e9

Please sign in to comment.