|
26 | 26 | #include "ompi/communicator/communicator.h" |
27 | 27 | #include "ompi/mca/coll/base/coll_tags.h" |
28 | 28 | #include "ompi/mca/coll/base/coll_base_functions.h" |
| 29 | +#include "ompi/mca/topo/base/base.h" |
29 | 30 | #include "ompi/mca/pml/pml.h" |
30 | 31 | #include "coll_base_util.h" |
31 | 32 |
|
@@ -149,6 +150,14 @@ int ompi_coll_base_retain_op( ompi_request_t *req, ompi_op_t *op, |
149 | 150 | retain = true; |
150 | 151 | } |
151 | 152 | if (OPAL_UNLIKELY(retain)) { |
| 153 | + /* We need to consider two cases : |
| 154 | + * - non blocking collectives: |
| 155 | + * the objects can be released when MPI_Wait() completes |
| 156 | + * and we use the req_complete_cb callback |
| 157 | + * - persistent non blocking collectives: |
| 158 | + * the objects can only be released when the request is freed |
| 159 | + * (e.g. MPI_Request_free() completes) and we use req_free callback |
| 160 | + */ |
152 | 161 | if (req->req_persistent) { |
153 | 162 | request->cb.req_free = req->req_free; |
154 | 163 | req->req_free = free_objs_callback; |
@@ -192,11 +201,18 @@ int ompi_coll_base_retain_datatypes( ompi_request_t *req, ompi_datatype_t *stype |
192 | 201 |
|
193 | 202 | static void release_vecs_callback(ompi_coll_base_nbc_request_t *request) { |
194 | 203 | ompi_communicator_t *comm = request->super.req_mpi_object.comm; |
195 | | - int count = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm); |
196 | | - for (int i=0; i<count; i++) { |
| 204 | + int scount, rcount; |
| 205 | + if (OMPI_COMM_IS_TOPO(comm)) { |
| 206 | + (void)mca_topo_base_neighbor_count (comm, &rcount, &scount); |
| 207 | + } else { |
| 208 | + scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm); |
| 209 | + } |
| 210 | + for (int i=0; i<scount; i++) { |
197 | 211 | if (NULL != request->data.vecs.stypes && NULL != request->data.vecs.stypes[i]) { |
198 | 212 | OMPI_DATATYPE_RELEASE(request->data.vecs.stypes[i]); |
199 | 213 | } |
| 214 | + } |
| 215 | + for (int i=0; i<rcount; i++) { |
200 | 216 | if (NULL != request->data.vecs.rtypes && NULL != request->data.vecs.rtypes[i]) { |
201 | 217 | OMPI_DATATYPE_RELEASE(request->data.vecs.rtypes[i]); |
202 | 218 | } |
@@ -229,13 +245,20 @@ int ompi_coll_base_retain_datatypes_w( ompi_request_t *req, |
229 | 245 | ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req; |
230 | 246 | bool retain = false; |
231 | 247 | ompi_communicator_t *comm = request->super.req_mpi_object.comm; |
232 | | - int count = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm); |
| 248 | + int scount, rcount; |
| 249 | + if (OMPI_COMM_IS_TOPO(comm)) { |
| 250 | + (void)mca_topo_base_neighbor_count (comm, &rcount, &scount); |
| 251 | + } else { |
| 252 | + scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm); |
| 253 | + } |
233 | 254 |
|
234 | | - for (int i=0; i<count; i++) { |
| 255 | + for (int i=0; i<scount; i++) { |
235 | 256 | if (NULL != stypes && NULL != stypes[i] && !ompi_datatype_is_predefined(stypes[i])) { |
236 | 257 | OBJ_RETAIN(stypes[i]); |
237 | 258 | retain = true; |
238 | 259 | } |
| 260 | + } |
| 261 | + for (int i=0; i<rcount; i++) { |
239 | 262 | if (NULL != rtypes && NULL != rtypes[i] && !ompi_datatype_is_predefined(rtypes[i])) { |
240 | 263 | OBJ_RETAIN(rtypes[i]); |
241 | 264 | retain = true; |
|
0 commit comments