Skip to content

Commit 5a4bcb2

Browse files
authored
Merge pull request #10554 from devreal/osc-ucx-rputget-flushnb-v5.0.x
osc/ucx: implement rput and rget using ucp_worker_flush_nb [v5.0.x]
2 parents 67d1a2a + ca5b99c commit 5a4bcb2

File tree

3 files changed

+89
-27
lines changed

3 files changed

+89
-27
lines changed

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,24 +1136,31 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11361136
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target, target_disp,
11371137
target_count, target_dt, win);
11381138
if (ret != OMPI_SUCCESS) {
1139+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
11391140
return ret;
11401141
}
11411142

1142-
ret = opal_common_ucx_wpmem_fence(mem);
1143-
if (ret != OMPI_SUCCESS) {
1144-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1145-
return OMPI_ERROR;
1146-
}
1147-
11481143
mca_osc_ucx_component.num_incomplete_req_ops++;
1149-
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
1150-
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
1151-
0, target, &(module->req_result),
1152-
sizeof(uint64_t), remote_addr & (~0x7),
1153-
req_completion, ucx_req);
1144+
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);
1145+
11541146
if (ret != OMPI_SUCCESS) {
1155-
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1156-
return ret;
1147+
1148+
/* fallback to using an atomic op to acquire a request handle */
1149+
ret = opal_common_ucx_wpmem_fence(mem);
1150+
if (ret != OMPI_SUCCESS) {
1151+
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1152+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1153+
return OMPI_ERROR;
1154+
}
1155+
1156+
ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
1157+
0, target, &(module->req_result),
1158+
sizeof(uint64_t), remote_addr & (~0x7),
1159+
req_completion, ucx_req);
1160+
if (ret != OMPI_SUCCESS) {
1161+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1162+
return ret;
1163+
}
11571164
}
11581165

11591166
*request = &ucx_req->super;
@@ -1191,24 +1198,31 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11911198
ret = ompi_osc_ucx_get(origin_addr, origin_count, origin_dt, target, target_disp,
11921199
target_count, target_dt, win);
11931200
if (ret != OMPI_SUCCESS) {
1201+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
11941202
return ret;
11951203
}
11961204

1197-
ret = opal_common_ucx_wpmem_fence(mem);
1198-
if (ret != OMPI_SUCCESS) {
1199-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1200-
return OMPI_ERROR;
1201-
}
1202-
12031205
mca_osc_ucx_component.num_incomplete_req_ops++;
1204-
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
1205-
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
1206-
0, target, &(module->req_result),
1207-
sizeof(uint64_t), remote_addr & (~0x7),
1208-
req_completion, ucx_req);
1206+
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);
1207+
12091208
if (ret != OMPI_SUCCESS) {
1210-
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1211-
return ret;
1209+
1210+
/* fallback to using an atomic op to acquire a request handle */
1211+
ret = opal_common_ucx_wpmem_fence(mem);
1212+
if (ret != OMPI_SUCCESS) {
1213+
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1214+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1215+
return OMPI_ERROR;
1216+
}
1217+
1218+
ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
1219+
0, target, &(module->req_result),
1220+
sizeof(uint64_t), remote_addr & (~0x7),
1221+
req_completion, ucx_req);
1222+
if (ret != OMPI_SUCCESS) {
1223+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1224+
return ret;
1225+
}
12121226
}
12131227

12141228
*request = &ucx_req->super;

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, in
763763
((opal_common_ucx_request_t *) req)->winfo = winfo;
764764
}
765765

766-
if (OPAL_COMMON_UCX_FLUSH_B) {
766+
if (OPAL_COMMON_UCX_FLUSH_B == type) {
767767
rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb");
768768
} else {
769769
*req_ptr = req;
@@ -822,13 +822,57 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
822822
if (rc != OPAL_SUCCESS) {
823823
MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d", rc);
824824
rc = OPAL_ERROR;
825+
break;
825826
}
826827
}
827828
opal_mutex_unlock(&ctx->mutex);
828829

829830
return rc;
830831
}
831832

833+
834+
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
835+
int target,
836+
opal_common_ucx_user_req_handler_t user_req_cb,
837+
void *user_req_ptr)
838+
{
839+
#if HAVE_DECL_UCP_EP_FLUSH_NB
840+
int rc = OPAL_SUCCESS;
841+
ucp_ep_h ep = NULL;
842+
ucp_rkey_h rkey = NULL;
843+
opal_common_ucx_winfo_t *winfo = NULL;
844+
845+
if (NULL == mem) {
846+
return OPAL_SUCCESS;
847+
}
848+
849+
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
850+
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
851+
MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
852+
return rc;
853+
}
854+
855+
opal_mutex_lock(&winfo->mutex);
856+
opal_common_ucx_request_t *req;
857+
req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_req_completion);
858+
if (UCS_PTR_IS_PTR(req)) {
859+
req->ext_req = user_req_ptr;
860+
req->ext_cb = user_req_cb;
861+
req->winfo = winfo;
862+
} else {
863+
if (user_req_cb != NULL) {
864+
(*user_req_cb)(user_req_ptr);
865+
}
866+
}
867+
opal_mutex_unlock(&winfo->mutex);
868+
return rc;
869+
#else
870+
return OPAL_ERR_NOT_SUPPORTED;
871+
#endif // HAVE_DECL_UCP_EP_FLUSH_NB
872+
873+
}
874+
875+
832876
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem)
833877
{
834878
ucs_status_t status = UCS_OK;

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ OPAL_DECLSPEC void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
247247

248248
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
249249
opal_common_ucx_flush_scope_t scope, int target);
250+
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
251+
int target,
252+
opal_common_ucx_user_req_handler_t user_req_cb,
253+
void *user_req_ptr);
250254
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
251255

252256
OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,

0 commit comments

Comments
 (0)