Skip to content

A hierarchical, architecture-aware collective communication module #7735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions ompi/communicator/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,10 @@ int ompi_comm_create ( ompi_communicator_t *comm, ompi_group_t *group,
/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
/*
** Counterpart to MPI_Comm_split. To be used within OMPI (e.g. MPI_Cart_sub).
*/
int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
ompi_communicator_t **newcomm, bool pass_on_topo )

int ompi_comm_split_with_info( ompi_communicator_t* comm, int color, int key,
opal_info_t *info,
ompi_communicator_t **newcomm, bool pass_on_topo )
{
int myinfo[2];
int size, my_size;
Expand Down Expand Up @@ -611,7 +610,11 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
snprintf(newcomp->c_name, MPI_MAX_OBJECT_NAME, "MPI COMMUNICATOR %d SPLIT FROM %d",
newcomp->c_contextid, comm->c_contextid );


/* Copy info if there is one */
if (info) {
newcomp->super.s_info = OBJ_NEW(opal_info_t);
opal_info_dup(info, &(newcomp->super.s_info));
}

/* Activate the communicator and init coll-component */
rc = ompi_comm_activate (&newcomp, comm, NULL, NULL, NULL, false, mode);
Expand All @@ -638,6 +641,15 @@ int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
}


/*
** Counterpart to MPI_Comm_split. To be used within OMPI (e.g. MPI_Cart_sub).
*/
int ompi_comm_split( ompi_communicator_t* comm, int color, int key,
ompi_communicator_t **newcomm, bool pass_on_topo )
{
return ompi_comm_split_with_info(comm, color, key, NULL, newcomm, pass_on_topo);
}

/**********************************************************************/
/**********************************************************************/
/**********************************************************************/
Expand Down
15 changes: 15 additions & 0 deletions ompi/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,21 @@ int ompi_topo_dist_graph_create_adjacent(ompi_communicator_t *old_comm,
OMPI_DECLSPEC int ompi_comm_split (ompi_communicator_t *comm, int color, int key,
ompi_communicator_t** newcomm, bool pass_on_topo);

/**
* split a communicator based on color and key. Parameters
* are identical to the MPI-counterpart of the function.
* Similar to \see ompi_comm_split with an additional info parameter.
*
* @param comm: input communicator
* @param color
* @param key
*
* @
*/
OMPI_DECLSPEC int ompi_comm_split_with_info( ompi_communicator_t* comm, int color, int key,
opal_info_t *info,
ompi_communicator_t **newcomm, bool pass_on_topo );

/**
* split a communicator based on type and key. Parameters
* are identical to the MPI-counterpart of the function.
Expand Down
28 changes: 28 additions & 0 deletions ompi/group/group.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,3 +578,31 @@ bool ompi_group_have_remote_peers (ompi_group_t *group)

return false;
}

/**
* Count the number of processes on this group that share the same node as
* this process.
*/
int ompi_group_count_local_peers (ompi_group_t *group)
{
int local_peers = 0;
for (int i = 0 ; i < group->grp_proc_count ; ++i) {
ompi_proc_t *proc = NULL;
#if OMPI_GROUP_SPARSE
proc = ompi_group_peer_lookup (group, i);
#else
proc = ompi_group_get_proc_ptr_raw (group, i);
if (ompi_proc_is_sentinel (proc)) {
/* the proc must be stored in the group or cached in the proc
* hash table if the process resides in the local node
* (see ompi_proc_complete_init) */
continue;
}
#endif
if (OPAL_PROC_ON_LOCAL_NODE(proc->super.proc_flags)) {
local_peers++;
}
}

return local_peers;
}
8 changes: 8 additions & 0 deletions ompi/group/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,16 @@ static inline struct ompi_proc_t *ompi_group_peer_lookup_existing (ompi_group_t
return ompi_group_get_proc_ptr (group, peer_id, false);
}

/**
* Return true if all processes in the group are not on the local node.
*/
bool ompi_group_have_remote_peers (ompi_group_t *group);

/**
* Count the number of processes on the local node.
*/
int ompi_group_count_local_peers (ompi_group_t *group);

/**
* Function to print the group info
*/
Expand Down
4 changes: 2 additions & 2 deletions ompi/mca/coll/adapt/coll_adapt_ibcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ static int send_cb(ompi_request_t * req)
|| (context->con->tree->tree_nextsize > 0 && rank != context->con->root
&& num_sent == context->con->tree->tree_nextsize * context->con->num_segs
&& num_recv_fini == context->con->num_segs)) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in send\n",
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Signal in send\n",
ompi_comm_rank(context->con->comm)));
ibcast_request_fini(context);
}
Expand Down Expand Up @@ -306,7 +306,7 @@ static int recv_cb(ompi_request_t * req)
&& num_recv_fini == context->con->num_segs)
|| (context->con->tree->tree_nextsize == 0
&& num_recv_fini == context->con->num_segs)) {
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Singal in recv\n",
OPAL_OUTPUT_VERBOSE((30, mca_coll_adapt_component.adapt_output, "[%d]: Signal in recv\n",
ompi_comm_rank(context->con->comm)));
ibcast_request_fini(context);
}
Expand Down
145 changes: 117 additions & 28 deletions ompi/mca/coll/base/coll_base_comm_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* and Technology (RIST). All rights reserved.
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
* Copyright (c) 2017 FUJITSU LIMITED. All rights reserved.
* Copyright (c) 2020 BULL S.A.S. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand All @@ -37,27 +38,20 @@
#include "mpi.h"
#include "ompi/communicator/communicator.h"
#include "opal/util/output.h"
#include "opal/util/argv.h"
#include "opal/util/show_help.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_object.h"
#include "ompi/mca/mca.h"
#include "opal/mca/base/base.h"
#include "ompi/mca/coll/coll.h"
#include "ompi/mca/coll/base/base.h"

#include "ompi/mca/coll/base/coll_base_util.h"

/*
* Local types
* Stuff for the OBJ interface
*/
struct avail_coll_t {
opal_list_item_t super;

int ac_priority;
mca_coll_base_module_2_3_0_t *ac_module;
const char * ac_component_name;
};
typedef struct avail_coll_t avail_coll_t;

OBJ_CLASS_INSTANCE(mca_coll_base_avail_coll_t, opal_list_item_t, NULL, NULL);

/*
* Local functions
Expand All @@ -77,12 +71,6 @@ static int query_2_0_0(const mca_coll_base_component_2_0_0_t *
int *priority,
mca_coll_base_module_2_3_0_t ** module);

/*
* Stuff for the OBJ interface
*/
static OBJ_CLASS_INSTANCE(avail_coll_t, opal_list_item_t, NULL, NULL);


#define COPY(module, comm, func) \
do { \
if (NULL != module->coll_ ## func) { \
Expand Down Expand Up @@ -138,11 +126,14 @@ int mca_coll_base_comm_select(ompi_communicator_t * comm)
/* FIX ME - Do some kind of collective operation to find a module
that everyone has available */

/* List to store every valid module */
comm->c_coll->module_list = OBJ_NEW(opal_list_t);

/* do the selection loop */
for (item = opal_list_remove_first(selectable);
NULL != item; item = opal_list_remove_first(selectable)) {

avail_coll_t *avail = (avail_coll_t *) item;
mca_coll_base_avail_coll_t *avail = (mca_coll_base_avail_coll_t *) item;

/* initialize the module */
ret = avail->ac_module->coll_module_enable(avail->ac_module, comm);
Expand All @@ -153,6 +144,9 @@ int mca_coll_base_comm_select(ompi_communicator_t * comm)
(OMPI_SUCCESS == ret ? "Enabled": "Disabled") );

if (OMPI_SUCCESS == ret) {
/* Save every component that is initialized,
* queried and enabled successfully */
opal_list_append(comm->c_coll->module_list, &avail->super);

/* copy over any of the pointers */
COPY(avail->ac_module, comm, allgather);
Expand Down Expand Up @@ -230,10 +224,11 @@ int mca_coll_base_comm_select(ompi_communicator_t * comm)
COPY(avail->ac_module, comm, neighbor_alltoallw_init);

COPY(avail->ac_module, comm, reduce_local);
} else {
/* release the original module reference and the list item */
OBJ_RELEASE(avail->ac_module);
OBJ_RELEASE(avail);
}
/* release the original module reference and the list item */
OBJ_RELEASE(avail->ac_module);
OBJ_RELEASE(avail);
}

/* Done with the list from the check_components() call so release it. */
Expand Down Expand Up @@ -306,8 +301,8 @@ int mca_coll_base_comm_select(ompi_communicator_t * comm)

static int avail_coll_compare (opal_list_item_t **a,
opal_list_item_t **b) {
avail_coll_t *acoll = (avail_coll_t *) *a;
avail_coll_t *bcoll = (avail_coll_t *) *b;
mca_coll_base_avail_coll_t *acoll = (mca_coll_base_avail_coll_t *) *a;
mca_coll_base_avail_coll_t *bcoll = (mca_coll_base_avail_coll_t *) *b;

if (acoll->ac_priority > bcoll->ac_priority) {
return 1;
Expand All @@ -318,6 +313,20 @@ static int avail_coll_compare (opal_list_item_t **a,
return 0;
}

static inline int
component_in_argv(char **argv, const char* component_name)
{
if( NULL != argv ) {
while( NULL != *argv ) {
if( 0 == strcmp(component_name, *argv) ) {
return 1;
}
argv++; /* move to the next argument */
}
}
return 0;
}

/*
* For each module in the list, check and see if it wants to run, and
* do the resulting priority comparison. Make a list of modules to be
Expand All @@ -327,25 +336,85 @@ static int avail_coll_compare (opal_list_item_t **a,
static opal_list_t *check_components(opal_list_t * components,
ompi_communicator_t * comm)
{
int priority;
int priority, flag;
const mca_base_component_t *component;
mca_base_component_list_item_t *cli;
mca_coll_base_module_2_3_0_t *module;
opal_list_t *selectable;
avail_coll_t *avail;

mca_coll_base_avail_coll_t *avail;
char info_val[OPAL_MAX_INFO_VAL+1];
char **coll_argv = NULL, **coll_exclude = NULL, **coll_include = NULL;

/* Check if this communicator comes with restrictions on the collective modules
* it wants to use. The restrictions are consistent with the MCA parameter
* to limit the collective components loaded, but it applies for each
* communicator and is provided as an info key during the communicator
* creation. Unlike the MCA param, this info key is used not to select
* components but either to prevent components from being used or to
* force a change in the component priority.
*/
if( NULL != comm->super.s_info) {
opal_info_get(comm->super.s_info, "ompi_comm_coll_preference",
sizeof(info_val), info_val, &flag);
if( !flag ) {
goto proceed_to_select;
}
coll_argv = opal_argv_split(info_val, ',');
if(NULL == coll_argv) {
goto proceed_to_select;
}
int idx2, count_include = opal_argv_count(coll_argv);
/* Allocate the coll_include argv */
coll_include = (char**)malloc((count_include + 1) * sizeof(char*));
coll_include[count_include] = NULL; /* NULL terminated array */
/* Dispatch the include/exclude in the corresponding arrays */
for( int idx = 0; NULL != coll_argv[idx]; idx++ ) {
if( '^' == coll_argv[idx][0] ) {
coll_include[idx] = NULL; /* NULL terminated array */

/* Allocate the coll_exclude argv */
coll_exclude = (char**)malloc((count_include - idx + 1) * sizeof(char*));
/* save the exclude components */
for( idx2 = idx; NULL != coll_argv[idx2]; idx2++ ) {
coll_exclude[idx2 - idx] = coll_argv[idx2];
}
coll_exclude[idx2 - idx] = NULL; /* NULL-terminated array */
coll_exclude[0] = coll_exclude[0] + 1; /* get rid of the ^ */
count_include = idx;
break;
}
coll_include[idx] = coll_argv[idx];
}
/* Reverse the order of the coll_inclide argv to faciliate the ordering of
* the selected components reverse.
*/
for( idx2 = 0; idx2 < (count_include - 1); idx2++ ) {
char* temp = coll_include[idx2];
coll_include[idx2] = coll_include[count_include - 1];
coll_include[count_include - 1] = temp;
count_include--;
}
}
proceed_to_select:
/* Make a list of the components that query successfully */
selectable = OBJ_NEW(opal_list_t);

/* Scan through the list of components */
OPAL_LIST_FOREACH(cli, &ompi_coll_base_framework.framework_components, mca_base_component_list_item_t) {
component = cli->cli_component;

/* dont bother is we have this component in the exclusion list */
if( component_in_argv(coll_exclude, component->mca_component_name) ) {
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"coll:base:comm_select: component disqualified: %s (due to communicator info key)",
component->mca_component_name );
continue;
}
priority = check_one_component(comm, component, &module);
if (priority >= 0) {
/* We have a component that indicated that it wants to run
by giving us a module */
avail = OBJ_NEW(avail_coll_t);
avail = OBJ_NEW(mca_coll_base_avail_coll_t);
avail->ac_priority = priority;
avail->ac_module = module;
// Point to the string so we don't have to free later
Expand Down Expand Up @@ -376,6 +445,27 @@ static opal_list_t *check_components(opal_list_t * components,
/* Put this list in priority order */
opal_list_sort(selectable, avail_coll_compare);

/* For all valid component reorder them not on their provided priorities but on
* the order requested in the info key. As at this point the coll_include is
* already ordered backward we can simply prepend the components.
*/
mca_coll_base_avail_coll_t *item, *item_next;
OPAL_LIST_FOREACH_SAFE(item, item_next,
selectable, mca_coll_base_avail_coll_t) {
if( component_in_argv(coll_include, item->ac_component_name) ) {
opal_list_remove_item(selectable, &item->super);
opal_list_prepend(selectable, &item->super);
}
}

opal_argv_free(coll_argv);
if( NULL != coll_exclude ) {
free(coll_exclude);
}
if( NULL != coll_include ) {
free(coll_include);
}

/* All done */
return selectable;
}
Expand Down Expand Up @@ -409,7 +499,6 @@ static int check_one_component(ompi_communicator_t * comm,
return priority;
}


/**************************************************************************
* Query functions
**************************************************************************/
Expand Down
Loading