Skip to content

Commit b0416f3

Browse files
committed
some pr feedback
use a different method for getting status of sub requests. change callback function names Signed-off-by: Howard Pritchard <howardp@lanl.gov>
1 parent 4cc6307 commit b0416f3

File tree

4 files changed

+53
-55
lines changed

4 files changed

+53
-55
lines changed

ompi/communicator/comm_request.c

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ typedef struct ompi_comm_request_item_t {
3232
opal_list_item_t super;
3333
ompi_comm_request_callback_fn_t callback;
3434
ompi_request_t *subreqs[OMPI_COMM_REQUEST_MAX_SUBREQ];
35-
ompi_status_public_t *statuses[OMPI_COMM_REQUEST_MAX_SUBREQ];
35+
uint32_t flags;
3636
int subreq_count;
3737
} ompi_comm_request_item_t;
3838
OBJ_CLASS_DECLARATION(ompi_comm_request_item_t);
@@ -74,34 +74,11 @@ void ompi_comm_request_fini (void)
7474
int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
7575
ompi_request_t *subreqs[], int subreq_count)
7676
{
77-
ompi_comm_request_item_t *request_item;
78-
int i;
79-
80-
if (subreq_count > OMPI_COMM_REQUEST_MAX_SUBREQ) {
81-
return OMPI_ERR_BAD_PARAM;
82-
}
83-
84-
request_item = OBJ_NEW(ompi_comm_request_item_t);
85-
if (NULL == request_item) {
86-
return OMPI_ERR_OUT_OF_RESOURCE;
87-
}
88-
89-
request_item->callback = callback;
90-
91-
for (i = 0 ; i < subreq_count ; ++i) {
92-
request_item->subreqs[i] = subreqs[i];
93-
request_item->statuses[i] = NULL;
94-
}
95-
96-
request_item->subreq_count = subreq_count;
97-
98-
opal_list_append (&request->schedule, &request_item->super);
99-
100-
return OMPI_SUCCESS;
77+
return ompi_comm_request_schedule_append_w_flags(request, callback, subreqs, subreq_count, 0);
10178
}
10279

103-
int ompi_comm_request_schedule_append_w_statuses(ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
104-
ompi_request_t *subreqs[], ompi_status_public_t *statuses, int subreq_count)
80+
int ompi_comm_request_schedule_append_w_flags(ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
81+
ompi_request_t *subreqs[], int subreq_count, uint32_t flags)
10582
{
10683
ompi_comm_request_item_t *request_item;
10784
int i;
@@ -123,10 +100,10 @@ int ompi_comm_request_schedule_append_w_statuses(ompi_comm_request_t *request, o
123100
}
124101

125102
request_item->callback = callback;
103+
request_item->flags = flags;
126104

127105
for (i = 0 ; i < subreq_count ; ++i) {
128106
request_item->subreqs[i] = subreqs[i];
129-
request_item->statuses[i] = &statuses[i];
130107
}
131108

132109
request_item->subreq_count = subreq_count;
@@ -140,7 +117,6 @@ static int ompi_comm_request_progress (void)
140117
{
141118
ompi_comm_request_t *request, *next;
142119
static opal_atomic_int32_t progressing = 0;
143-
ompi_status_public_t *status;
144120
int completed = 0;
145121

146122
/* don't allow re-entry */
@@ -166,11 +142,9 @@ static int ompi_comm_request_progress (void)
166142
* that it does some subreqs cleanup */
167143
request->super.req_status.MPI_ERROR = subreq->req_status.MPI_ERROR;
168144
}
169-
status = request_item->statuses[request_item->subreq_count-1];
170-
if (NULL != status) {
171-
OMPI_COPY_STATUS(status, subreq->req_status, false);
145+
if (!(request_item->flags & OMPI_COMM_REQ_FLAG_RETAIN_SUBREQ)) {
146+
ompi_request_free (&subreq);
172147
}
173-
ompi_request_free (&subreq);
174148
request_item->subreq_count--;
175149
completed++;
176150
} else {

ompi/communicator/comm_request.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
/* increase this number if more subrequests are needed */
2121
#define OMPI_COMM_REQUEST_MAX_SUBREQ 2
2222

23+
/* indicate that the caller will free subrequests */
24+
#define OMPI_COMM_REQ_FLAG_RETAIN_SUBREQ 0x00000001
25+
2326
typedef struct ompi_comm_request_t {
2427
ompi_request_t super;
2528

@@ -34,8 +37,8 @@ void ompi_comm_request_init (void);
3437
void ompi_comm_request_fini (void);
3538
int ompi_comm_request_schedule_append (ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
3639
ompi_request_t *subreqs[], int subreq_count);
37-
int ompi_comm_request_schedule_append_w_statuses(ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
38-
ompi_request_t *subreqs[], ompi_status_public_t *statuses, int subreq_count);
40+
int ompi_comm_request_schedule_append_w_flags(ompi_comm_request_t *request, ompi_comm_request_callback_fn_t callback,
41+
ompi_request_t *subreqs[], int subreq_count, uint32_t flags);
3942
void ompi_comm_request_start (ompi_comm_request_t *request);
4043
ompi_comm_request_t *ompi_comm_request_get (void);
4144
void ompi_comm_request_return (ompi_comm_request_t *request);

ompi/mpi/c/isendrecv.c

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ static const char FUNC_NAME[] = "MPI_Isendrecv";
4848
struct ompi_isendrecv_context_t {
4949
opal_object_t super;
5050
int nreqs;
51-
ompi_status_public_t statuses[2];
51+
ompi_request_t *subreq[2];
5252
};
5353

5454
typedef struct ompi_isendrecv_context_t ompi_isendrecv_context_t;
5555
OBJ_CLASS_INSTANCE(ompi_isendrecv_context_t, opal_object_t, NULL, NULL);
5656

57-
static int ompi_isendrecv_completer_func (ompi_comm_request_t *request)
57+
static int ompi_isendrecv_complete_func (ompi_comm_request_t *request)
5858
{
5959
ompi_isendrecv_context_t *context =
6060
(ompi_isendrecv_context_t *) request->context;
@@ -67,7 +67,13 @@ static int ompi_isendrecv_completer_func (ompi_comm_request_t *request)
6767
*/
6868

6969
OMPI_COPY_STATUS(&request->super.req_status,
70-
context->statuses[0], false);
70+
context->subreq[0]->req_status, false);
71+
if(NULL != context->subreq[0]) {
72+
ompi_request_free(&context->subreq[0]);
73+
}
74+
if(NULL != context->subreq[1]) {
75+
ompi_request_free(&context->subreq[1]);
76+
}
7177

7278
return OMPI_SUCCESS;
7379
}
@@ -80,9 +86,9 @@ int MPI_Isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
8086
{
8187
ompi_isendrecv_context_t *context = NULL;
8288
ompi_comm_request_t *crequest;
83-
ompi_request_t *subreq[2];
8489
int rc = MPI_SUCCESS;
8590
int nreqs = 0;
91+
uint32_t flags;
8692

8793
SPC_RECORD(OMPI_SPC_ISENDRECV, 1);
8894

@@ -129,10 +135,12 @@ int MPI_Isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
129135
}
130136

131137
crequest->context = &context->super;
138+
context->subreq[0] = NULL;
139+
context->subreq[1] = NULL;
132140

133141
if (source != MPI_PROC_NULL) { /* post recv */
134142
rc = MCA_PML_CALL(irecv(recvbuf, recvcount, recvtype,
135-
source, recvtag, comm, &subreq[nreqs++]));
143+
source, recvtag, comm, &context->subreq[nreqs++]));
136144
if (MPI_SUCCESS != rc) {
137145
ompi_comm_request_return (crequest);
138146
}
@@ -141,7 +149,7 @@ int MPI_Isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
141149

142150
if (dest != MPI_PROC_NULL) { /* send */
143151
rc = MCA_PML_CALL(isend(sendbuf, sendcount, sendtype, dest,
144-
sendtag, MCA_PML_BASE_SEND_STANDARD, comm, &subreq[nreqs++]));
152+
sendtag, MCA_PML_BASE_SEND_STANDARD, comm, &context->subreq[nreqs++]));
145153
if (MPI_SUCCESS != rc) {
146154
ompi_comm_request_return (crequest);
147155
}
@@ -155,8 +163,10 @@ int MPI_Isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
155163
context->nreqs = nreqs;
156164
assert(nreqs <= 2);
157165

158-
rc = ompi_comm_request_schedule_append_w_statuses(crequest, ompi_isendrecv_completer_func,
159-
subreq, context->statuses, nreqs);
166+
flags = OMPI_COMM_REQ_FLAG_RETAIN_SUBREQ;
167+
168+
rc = ompi_comm_request_schedule_append_w_flags(crequest, ompi_isendrecv_complete_func,
169+
context->subreq, nreqs, flags);
160170
OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
161171

162172
/* kick off the request */

ompi/mpi/c/isendrecv_replace.c

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ struct ompi_isendrecv_replace_context_t {
5151
unsigned char packed_data[2048];
5252
struct iovec iov;
5353
int nreqs;
54-
ompi_status_public_t statuses[2];
54+
ompi_request_t *subreq[2];
5555
};
5656

5757
typedef struct ompi_isendrecv_replace_context_t ompi_isendrecv_replace_context_t;
@@ -75,7 +75,7 @@ OBJ_CLASS_INSTANCE(ompi_isendrecv_replace_context_t,
7575
ompi_isendrecv_context_constructor,
7676
ompi_isendrecv_context_destructor);
7777

78-
static int ompi_isendrecv_replace_completer_func (ompi_comm_request_t *request)
78+
static int ompi_isendrecv_replace_complete_func (ompi_comm_request_t *request)
7979
{
8080
ompi_isendrecv_replace_context_t *context =
8181
(ompi_isendrecv_replace_context_t *) request->context;
@@ -87,8 +87,14 @@ static int ompi_isendrecv_replace_completer_func (ompi_comm_request_t *request)
8787
* Probably need to bring up in the MPI forum.
8888
*/
8989

90-
OMPI_COPY_STATUS(&request->super.req_status,
91-
context->statuses[0], false);
90+
OMPI_COPY_STATUS(&request->super.req_status,
91+
context->subreq[0]->req_status, false);
92+
if(NULL != context->subreq[0]) {
93+
ompi_request_free(&context->subreq[0]);
94+
}
95+
if(NULL != context->subreq[1]) {
96+
ompi_request_free(&context->subreq[1]);
97+
}
9298

9399
return OMPI_SUCCESS;
94100
}
@@ -104,8 +110,8 @@ int MPI_Isendrecv_replace(void * buf, int count, MPI_Datatype datatype,
104110
uint32_t iov_count;
105111
ompi_comm_request_t *crequest = NULL;
106112
ompi_isendrecv_replace_context_t *context = NULL;
107-
ompi_request_t *subreq[2];
108113
int nreqs = 0;
114+
uint32_t flags;
109115

110116
SPC_RECORD(OMPI_SPC_SENDRECV_REPLACE, 1);
111117

@@ -173,6 +179,8 @@ int MPI_Isendrecv_replace(void * buf, int count, MPI_Datatype datatype,
173179
context->iov.iov_len = sizeof(context->packed_data);
174180

175181
crequest->context = &context->super;
182+
context->subreq[0] = NULL;
183+
context->subreq[1] = NULL;
176184

177185
/* initialize convertor to unpack recv buffer */
178186
OBJ_CONSTRUCT(&context->convertor, opal_convertor_t);
@@ -200,7 +208,7 @@ int MPI_Isendrecv_replace(void * buf, int count, MPI_Datatype datatype,
200208

201209
if (source != MPI_PROC_NULL) { /* post recv */
202210
rc = MCA_PML_CALL(irecv(buf, count, datatype,
203-
source, recvtag, comm, &subreq[nreqs++]));
211+
source, recvtag, comm, &context->subreq[nreqs++]));
204212
if (MPI_SUCCESS != rc) {
205213
ompi_comm_request_return (crequest);
206214
}
@@ -209,7 +217,8 @@ int MPI_Isendrecv_replace(void * buf, int count, MPI_Datatype datatype,
209217

210218
if (dest != MPI_PROC_NULL) { /* send */
211219
rc = MCA_PML_CALL(isend(context->iov.iov_base, context->packed_size, MPI_PACKED, dest,
212-
sendtag, MCA_PML_BASE_SEND_STANDARD, comm, &subreq[nreqs++]));
220+
sendtag, MCA_PML_BASE_SEND_STANDARD, comm,
221+
&context->subreq[nreqs++]));
213222
if (MPI_SUCCESS != rc) {
214223
ompi_comm_request_return (crequest);
215224
}
@@ -223,11 +232,13 @@ int MPI_Isendrecv_replace(void * buf, int count, MPI_Datatype datatype,
223232
context->nreqs = nreqs;
224233
assert(nreqs <= 2);
225234

226-
rc = ompi_comm_request_schedule_append_w_statuses(crequest,
227-
ompi_isendrecv_replace_completer_func,
228-
subreq,
229-
context->statuses,
230-
nreqs);
235+
flags = OMPI_COMM_REQ_FLAG_RETAIN_SUBREQ;
236+
237+
rc = ompi_comm_request_schedule_append_w_flags(crequest,
238+
ompi_isendrecv_replace_complete_func,
239+
context->subreq,
240+
nreqs,
241+
flags);
231242
OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
232243

233244
/* kick off the request */

0 commit comments

Comments
 (0)