Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 19 additions & 27 deletions ompi/mca/coll/acoll/coll_acoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
#include "ompi/mca/mca.h"
#include "ompi/request/request.h"

#ifdef HAVE_XPMEM_H
#include "opal/mca/rcache/base/base.h"
#include "opal/class/opal_hash_table.h"
#include <xpmem.h>
#endif

#include "opal/mca/accelerator/accelerator.h"
#include "opal/mca/shmem/base/base.h"
#include "opal/mca/shmem/shmem.h"

// For smsc
#include "opal/mca/smsc/smsc.h"

BEGIN_C_DECLS

/* Globally exported variables */
Expand Down Expand Up @@ -88,6 +85,7 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base

END_C_DECLS

#define MCA_COLL_ACOLL_MIN_COMM_SIZE 16
#define MCA_COLL_ACOLL_ROOT_CHANGE_THRESH 10
#define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST_LEN 6
#define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST {2, 4, 8, 16, 32, 64}
Expand Down Expand Up @@ -127,20 +125,22 @@ typedef enum MCA_COLL_ACOLL_BASE_LYRS {
MCA_COLL_ACOLL_NUM_BASE_LYRS
} MCA_COLL_ACOLL_BASE_LYRS;

typedef struct coll_acoll_smsc_info {
mca_smsc_endpoint_t **ep;
void **rreg;
void **sreg;
} coll_acoll_smsc_info_t;

typedef struct coll_acoll_data {
#ifdef HAVE_XPMEM_H
xpmem_segid_t *allseg_id;
xpmem_apid_t *all_apid;
void **allshm_sbuf;
void **allshm_rbuf;
void **xpmem_saddr;
void **xpmem_raddr;
mca_rcache_base_module_t **rcache;
void *scratch;
opal_hash_table_t **xpmem_reg_tracker_ht;
#endif
void **smsc_saddr;
void **smsc_raddr;

opal_shmem_ds_t *allshmseg_id;
void **allshmmmap_sbuf;
coll_acoll_smsc_info_t smsc_info;

int comm_size;
int l1_local_rank;
Expand Down Expand Up @@ -204,11 +204,9 @@ typedef struct coll_acoll_subcomms {
bool initialized_data;
bool initialized_shm_data;
int barrier_algo;
#ifdef HAVE_XPMEM_H
uint64_t xpmem_buf_size;
int without_xpmem;
int xpmem_use_sr_buf;
#endif
uint64_t smsc_buf_size;
int without_smsc;
int smsc_use_sr_buf;

} coll_acoll_subcomms_t;

Expand All @@ -222,7 +220,6 @@ typedef struct coll_acoll_reserve_mem {
typedef struct {
int split_factor;
size_t psplit_msg_thresh;
size_t xpmem_msg_thresh;
} coll_acoll_alltoall_attr_t;

struct mca_coll_acoll_module_t {
Expand Down Expand Up @@ -252,15 +249,10 @@ struct mca_coll_acoll_module_t {
coll_acoll_reserve_mem_t reserve_mem_s;
int num_subc;
coll_acoll_alltoall_attr_t alltoall_attr;
// 1 if SMSC, in particular xpmem is available, 0 otherwise
int has_smsc;
};

#ifdef HAVE_XPMEM_H
struct acoll_xpmem_rcache_reg_t {
mca_rcache_base_registration_t base;
void *xpmem_vaddr;
};
#endif

typedef struct mca_coll_acoll_module_t mca_coll_acoll_module_t;
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(mca_coll_acoll_module_t);

Expand Down
6 changes: 6 additions & 0 deletions ompi/mca/coll/acoll/coll_acoll_allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty

/* Return if intra-node communicator */
if ((1 == num_nodes) || (size <= 2)) {
/* Call barrier to ensure that the data is copied properly before returning */
ompi_coll_base_barrier_intra_basic_linear(comm, module);

/* All done */
return err;
}
Expand Down Expand Up @@ -620,6 +623,9 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
}
}

/* Call barrier to ensure that the data is copied properly before returning */
ompi_coll_base_barrier_intra_basic_linear(comm, module);

/* All done */
return err;
}
90 changes: 44 additions & 46 deletions ompi/mca/coll/acoll/coll_acoll_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ static inline int coll_allreduce_decision_fixed(int comm_size, size_t msg_size)
return alg;
}

#ifdef HAVE_XPMEM_H
static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, size_t count,
static inline int mca_coll_acoll_reduce_smsc_h(const void *sbuf, void *rbuf, size_t count,
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module,
Expand Down Expand Up @@ -79,9 +78,9 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
int l2_local_rank = data->l2_local_rank;
char *tmp_sbuf = NULL;
char *tmp_rbuf = NULL;
if (!subc->xpmem_use_sr_buf) {
if (!subc->smsc_use_sr_buf) {
tmp_rbuf = (char *) data->scratch;
tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2;
tmp_sbuf = (char *) data->scratch + (subc->smsc_buf_size) / 2;
if ((MPI_IN_PLACE == sbuf)) {
memcpy(tmp_sbuf, rbuf, total_dsize);
} else {
Expand Down Expand Up @@ -112,7 +111,10 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
return err;
}

register_and_cache(size, total_dsize, rank, data);
err = register_mem_with_smsc(rank, size, total_dsize, data, comm);
if (err != MPI_SUCCESS) {
return err;
}

/* reduce to the local group leader */
size_t chunk = count / l1_gp_size;
Expand All @@ -123,21 +125,21 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
memcpy(tmp_rbuf, sbuf, my_count_size * dsize);

for (int i = 1; i < l1_gp_size; i++) {
ompi_op_reduce(op, (char *) data->xpmem_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize,
ompi_op_reduce(op, (char *) data->smsc_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize,
(char *) tmp_rbuf + chunk * l1_local_rank * dsize, my_count_size, dtype);
}
} else {
ompi_3buff_op_reduce(op,
(char *) data->xpmem_saddr[l1_gp[0]] + chunk * l1_local_rank * dsize,
(char *) data->smsc_saddr[l1_gp[0]] + chunk * l1_local_rank * dsize,
(char *) tmp_sbuf + chunk * l1_local_rank * dsize,
(char *) data->xpmem_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize,
(char *) data->smsc_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize,
my_count_size, dtype);
for (int i = 1; i < l1_gp_size; i++) {
if (i == l1_local_rank) {
continue;
}
ompi_op_reduce(op, (char *) data->xpmem_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize,
(char *) data->xpmem_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize,
ompi_op_reduce(op, (char *) data->smsc_saddr[l1_gp[i]] + chunk * l1_local_rank * dsize,
(char *) data->smsc_raddr[l1_gp[0]] + chunk * l1_local_rank * dsize,
my_count_size, dtype);
}
}
Expand All @@ -155,7 +157,7 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si

if (0 == l2_local_rank) {
for (int i = 1; i < local_size; i++) {
ompi_op_reduce(op, (char *) data->xpmem_raddr[l2_gp[i]], (char *) tmp_rbuf,
ompi_op_reduce(op, (char *) data->smsc_raddr[l2_gp[i]], (char *) tmp_rbuf,
my_count_size, dtype);
}
} else {
Expand All @@ -165,24 +167,26 @@ static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, si
}

ompi_op_reduce(op,
(char *) data->xpmem_raddr[l2_gp[i]] + chunk * l2_local_rank * dsize,
(char *) data->xpmem_raddr[0] + chunk * l2_local_rank * dsize,
(char *) data->smsc_raddr[l2_gp[i]] + chunk * l2_local_rank * dsize,
(char *) data->smsc_raddr[0] + chunk * l2_local_rank * dsize,
my_count_size, dtype);
}
ompi_op_reduce(op, (char *) tmp_rbuf + chunk * l2_local_rank * dsize,
(char *) data->xpmem_raddr[0] + chunk * l2_local_rank * dsize,
(char *) data->smsc_raddr[0] + chunk * l2_local_rank * dsize,
my_count_size, dtype);
}
}

err = ompi_coll_base_barrier_intra_tree(comm, module);
if (!subc->xpmem_use_sr_buf) {
if (!subc->smsc_use_sr_buf) {
memcpy(rbuf, tmp_rbuf, total_dsize);
}
// Note: neither unmap nor deregister will have any effect here, just having it for consistency
unmap_mem_with_smsc(rank, size, data);
return err;
}

static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, size_t count,
static inline int mca_coll_acoll_allreduce_smsc_f(const void *sbuf, void *rbuf, size_t count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
struct ompi_communicator_t *comm,
Expand All @@ -204,9 +208,9 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,

char *tmp_sbuf = NULL;
char *tmp_rbuf = NULL;
if (!subc->xpmem_use_sr_buf) {
if (!subc->smsc_use_sr_buf) {
tmp_rbuf = (char *) data->scratch;
tmp_sbuf = (char *) data->scratch + (subc->xpmem_buf_size) / 2;
tmp_sbuf = (char *) data->scratch + (subc->smsc_buf_size) / 2;
if ((MPI_IN_PLACE == sbuf)) {
memcpy(tmp_sbuf, rbuf, total_dsize);
} else {
Expand Down Expand Up @@ -238,15 +242,18 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
return err;
}

register_and_cache(size, total_dsize, rank, data);
err = register_mem_with_smsc(rank, size, total_dsize, data, comm);
if (err != MPI_SUCCESS) {
return err;
}

size_t chunk = count / size;
size_t my_count_size = (rank == (size - 1)) ? (count / size) + count % size : count / size;
if (0 == rank) {
if (sbuf != MPI_IN_PLACE)
memcpy(tmp_rbuf, sbuf, my_count_size * dsize);
} else {
ompi_3buff_op_reduce(op, (char *) data->xpmem_saddr[0] + chunk * rank * dsize,
ompi_3buff_op_reduce(op, (char *) data->smsc_saddr[0] + chunk * rank * dsize,
(char *) tmp_sbuf + chunk * rank * dsize,
(char *) tmp_rbuf + chunk * rank * dsize, my_count_size, dtype);
}
Expand All @@ -260,7 +267,7 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
if (rank == i) {
continue;
}
ompi_op_reduce(op, (char *) data->xpmem_saddr[i] + chunk * rank * dsize,
ompi_op_reduce(op, (char *) data->smsc_saddr[i] + chunk * rank * dsize,
(char *) tmp_rbuf + chunk * rank * dsize, my_count_size, dtype);
}
err = ompi_coll_base_barrier_intra_tree(comm, module);
Expand All @@ -270,21 +277,23 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,

size_t tmp = chunk * dsize;
for (int i = 0; i < size; i++) {
if (subc->xpmem_use_sr_buf && (rank == i)) {
if (subc->smsc_use_sr_buf && (rank == i)) {
continue;
}
my_count_size = (i == (size - 1)) ? (count / size) + count % size : count / size;
size_t tmp1 = i * tmp;
char *dst = (char *) rbuf + tmp1;
char *src = (char *) data->xpmem_raddr[i] + tmp1;
char *src = (char *) data->smsc_raddr[i] + tmp1;
memcpy(dst, src, my_count_size * dsize);
}

err = ompi_coll_base_barrier_intra_tree(comm, module);

// Note: neither unmap nor deregister will have any effect here, just having it for consistency
unmap_mem_with_smsc(rank, size, data);

return err;
}
#endif

void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp_size, int rank,
int up)
Expand Down Expand Up @@ -450,7 +459,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
ompi_datatype_type_size(dtype, &dsize);
total_dsize = dsize * count;

/* Disable shm/xpmem based optimizations if: */
/* Disable smsc/shm/xpmem based optimizations if: */
/* - datatype is not a predefined type */
/* - it's a gpu buffer */
uint64_t flags = 0;
Expand Down Expand Up @@ -481,6 +490,10 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
coll_acoll_subcomms_t *subc = NULL;
err = check_and_create_subc(comm, acoll_module, &subc);

if (MPI_SUCCESS != err) {
return err;
}

/* Fallback to knomial if subc is not obtained */
if (NULL == subc) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm,
Expand Down Expand Up @@ -518,42 +531,27 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
comm, module, 0);
}
} else if (total_dsize < 4194304) {
#ifdef HAVE_XPMEM_H
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) {
return mca_coll_acoll_allreduce_smsc_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
}
#else
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
#endif
} else if (total_dsize <= 16777216) {
#ifdef HAVE_XPMEM_H
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc);
if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) {
mca_coll_acoll_reduce_smsc_h(sbuf, rbuf, count, dtype, op, comm, module, subc);
return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
}
#else
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
#endif
} else {
#ifdef HAVE_XPMEM_H
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1) && is_opt) {
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) {
return mca_coll_acoll_allreduce_smsc_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
}
#else
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
#endif
}

} else {
Expand Down
Loading