Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions ompi/mca/osc/ucx/osc_ucx_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1115,21 +1115,28 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
return ret;
}

ret = opal_common_ucx_wpmem_fence(mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}

mca_osc_ucx_component.num_incomplete_req_ops++;
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr & (~0x7),
req_completion, ucx_req);
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);

if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret;

/* fallback to using an atomic op to acquire a request handle */
ret = opal_common_ucx_wpmem_fence(mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}

ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr & (~0x7),
req_completion, ucx_req);
if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret;
}
}

*request = &ucx_req->super;
Expand Down Expand Up @@ -1170,21 +1177,28 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
return ret;
}

ret = opal_common_ucx_wpmem_fence(mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}

mca_osc_ucx_component.num_incomplete_req_ops++;
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr & (~0x7),
req_completion, ucx_req);
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);

if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret;

/* fallback to using an atomic op to acquire a request handle */
ret = opal_common_ucx_wpmem_fence(mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}

ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr & (~0x7),
req_completion, ucx_req);
if (ret != OMPI_SUCCESS) {
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
return ret;
}
}

*request = &ucx_req->super;
Expand Down
46 changes: 45 additions & 1 deletion opal/mca/common/ucx/common_ucx_wpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, in
((opal_common_ucx_request_t *) req)->winfo = winfo;
}

if (OPAL_COMMON_UCX_FLUSH_B) {
if (OPAL_COMMON_UCX_FLUSH_B == type) {
rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb");
} else {
*req_ptr = req;
Expand Down Expand Up @@ -820,13 +820,57 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
if (rc != OPAL_SUCCESS) {
MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d", rc);
rc = OPAL_ERROR;
break;
}
}
opal_mutex_unlock(&ctx->mutex);

return rc;
}


OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
int target,
opal_common_ucx_user_req_handler_t user_req_cb,
void *user_req_ptr)
{
#if HAVE_DECL_UCP_EP_FLUSH_NB
int rc = OPAL_SUCCESS;
ucp_ep_h ep = NULL;
ucp_rkey_h rkey = NULL;
opal_common_ucx_winfo_t *winfo = NULL;

if (NULL == mem) {
return OPAL_SUCCESS;
}

rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
return rc;
}

opal_mutex_lock(&winfo->mutex);
opal_common_ucx_request_t *req;
req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_req_completion);
if (UCS_PTR_IS_PTR(req)) {
req->ext_req = user_req_ptr;
req->ext_cb = user_req_cb;
req->winfo = winfo;
} else {
if (user_req_cb != NULL) {
(*user_req_cb)(user_req_ptr);
}
}
opal_mutex_unlock(&winfo->mutex);
return rc;
#else
return OPAL_ERR_NOT_SUPPORTED;
#endif // HAVE_DECL_UCP_EP_FLUSH_NB

}


OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem)
{
ucs_status_t status = UCS_OK;
Expand Down
4 changes: 4 additions & 0 deletions opal/mca/common/ucx/common_ucx_wpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ OPAL_DECLSPEC void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);

OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
opal_common_ucx_flush_scope_t scope, int target);
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
int target,
opal_common_ucx_user_req_handler_t user_req_cb,
void *user_req_ptr);
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);

OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,
Expand Down