Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ompi/mca/osc/ucx/osc_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,5 +228,10 @@ int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win);
int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins,
int min_index, int max_index,
uint64_t base, size_t len, int *insert);
extern inline bool ompi_osc_need_acc_lock(ompi_osc_ucx_module_t *module, int target);
extern inline int ompi_osc_state_lock(ompi_osc_ucx_module_t *module, int target,
bool *lock_acquired, bool force_lock);
extern inline int ompi_osc_state_unlock(ompi_osc_ucx_module_t *module, int target,
bool lock_acquired, void *free_ptr);

#endif /* OMPI_OSC_UCX_H */
179 changes: 60 additions & 119 deletions ompi/mca/osc/ucx/osc_ucx_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
return OMPI_ERROR; \
}

#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret) \
if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
_ret = get_dynamic_win_info(_remote_addr, _module, _target); \
if (_ret != OMPI_SUCCESS) { \
return _ret; \
} \
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret, _lock_required) \
if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
_ret = get_dynamic_win_info(_remote_addr, _module, _target, _lock_required); \
if (_ret != OMPI_SUCCESS) { \
return _ret; \
} \
}

typedef struct ucx_iovec {
Expand Down Expand Up @@ -251,89 +251,8 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
return ret;
}

static inline bool need_acc_lock(ompi_osc_ucx_module_t *module, int target)
{
ompi_osc_ucx_lock_t *lock = NULL;
opal_hash_table_get_value_uint32(&module->outstanding_locks,
(uint32_t) target, (void **) &lock);

/* if there is an exclusive lock there is no need to acqurie the accumulate lock */
return !(NULL != lock && LOCK_EXCLUSIVE == lock->type);
}

static inline int start_atomicity(
ompi_osc_ucx_module_t *module,
int target,
bool *lock_acquired) {
uint64_t result_value = -1;
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
int ret = OMPI_SUCCESS;

if (need_acc_lock(module, target)) {
for (;;) {
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
target, &result_value, sizeof(result_value),
remote_addr);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret);
return OMPI_ERROR;
}
if (result_value == TARGET_LOCK_UNLOCKED) {
break;
}

opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}

*lock_acquired = true;
} else {
*lock_acquired = false;
}

return OMPI_SUCCESS;
}

static inline int end_atomicity(
ompi_osc_ucx_module_t *module,
int target,
bool lock_acquired,
void *free_ptr) {
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
int ret = OMPI_SUCCESS;

if (lock_acquired) {
uint64_t result_value = 0;
/* fence any still active operations */
ret = opal_common_ucx_wpmem_fence(module->mem);
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
return OMPI_ERROR;
}

ret = opal_common_ucx_wpmem_fetch(module->state_mem,
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
target, &result_value, sizeof(result_value),
remote_addr);
assert(result_value == TARGET_LOCK_EXCLUSIVE);
} else if (NULL != free_ptr){
/* flush before freeing the buffer */
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
}
/* TODO: encapsulate in a request and make the release non-blocking */
if (NULL != free_ptr) {
free(free_ptr);
}
if (ret != OMPI_SUCCESS) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret);
return OMPI_ERROR;
}

return ret;
}

static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module,
int target) {
int target, bool lock_required) {
uint64_t remote_state_addr = (module->state_addrs)[target] + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET;
size_t remote_state_len = sizeof(uint64_t) + sizeof(ompi_osc_dynamic_win_info_t) * OMPI_OSC_UCX_ATTACH_MAX;
char *temp_buf = calloc(remote_state_len, 1);
Expand All @@ -343,6 +262,17 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
int insert = -1;
int ret;

bool lock_acquired = false;
if (lock_required) {
/* We need to lock acc-lock even if the process has an exclusive lock.
* Therefore, force lock is needed. Remote process protects its window
* attach/detach operations with an acc-lock */
ret = ompi_osc_state_lock(module, target, &lock_acquired, true);
if (ret != OMPI_SUCCESS) {
return ret;
}
}

ret = opal_common_ucx_wpmem_putget(module->state_mem, OPAL_COMMON_UCX_GET, target,
(void *)((intptr_t)temp_buf),
remote_state_len, remote_state_addr);
Expand All @@ -360,32 +290,36 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module

memcpy(&win_count, temp_buf, sizeof(uint64_t));
if (win_count > OMPI_OSC_UCX_ATTACH_MAX) {
return MPI_ERR_RMA_RANGE;
ret = MPI_ERR_RMA_RANGE;
goto cleanup;
}

temp_dynamic_wins = (ompi_osc_dynamic_win_info_t *)(temp_buf + sizeof(uint64_t));
contain = ompi_osc_find_attached_region_position(temp_dynamic_wins, 0, win_count - 1,
remote_addr, 1, &insert);
if (contain < 0 || contain >= win_count) {
return MPI_ERR_RMA_RANGE;
if (contain < 0 || contain >= (int)win_count) {
OSC_UCX_ERROR("Dynamic window index not found contain: %d win_count: %d\n",
contain, win_count);
ret = MPI_ERR_RMA_RANGE;
goto cleanup;
}

assert(module->mem != NULL);

_mem_record_t *mem_rec = NULL;
ret = opal_tsd_tracked_key_get(&module->mem->tls_key, (void **) &mem_rec);
if (OPAL_SUCCESS != ret) {
return ret;
goto cleanup;
}

if (mem_rec == NULL) {
ret = opal_common_ucx_tlocal_fetch_spath(module->mem, target);
if (OPAL_SUCCESS != ret) {
return ret;
goto cleanup;
}
ret = opal_tsd_tracked_key_get(&module->mem->tls_key, (void **) &mem_rec);
if (OPAL_SUCCESS != ret) {
return ret;
goto cleanup;
}

}
Expand All @@ -408,12 +342,15 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module

if (ret != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", ret);
return OPAL_ERROR;
ret = OPAL_ERROR;
goto cleanup;
}

cleanup:
free(temp_buf);

ompi_osc_state_unlock(module, target, lock_acquired, NULL);

return ret;
}

Expand Down Expand Up @@ -486,7 +423,7 @@ static int do_atomic_op_intrinsic(

uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);

ucp_atomic_fetch_op_t opcode;
bool is_no_op = false;
Expand Down Expand Up @@ -555,7 +492,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);

if (!target_count) {
return OMPI_SUCCESS;
Expand Down Expand Up @@ -605,13 +542,12 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);

if (!target_count) {
return OMPI_SUCCESS;
}


ompi_datatype_get_true_extent(origin_dt, &origin_lb, &origin_extent);
ompi_datatype_get_true_extent(target_dt, &target_lb, &target_extent);

Expand Down Expand Up @@ -673,13 +609,14 @@ int accumulate_req(const void *origin_addr, int origin_count,
target_disp, NULL, ucx_req);
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);

ret = start_atomicity(module, target, &lock_acquired);
/* Start atomicity by acquiring acc lock */
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
if (ret != OMPI_SUCCESS) {
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);

if (op == &ompi_mpi_op_replace.op) {
ret = ompi_osc_ucx_put(origin_addr, origin_count, origin_dt, target,
target_disp, target_count, target_dt, win);
Expand Down Expand Up @@ -781,7 +718,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
ompi_request_complete(&ucx_req->super, true);
}

return end_atomicity(module, target, lock_acquired, free_ptr);
return ompi_osc_state_unlock(module, target, lock_acquired, free_ptr);
}

int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
Expand All @@ -804,13 +741,14 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
size_t dt_bytes;
opal_common_ucx_wpmem_t *mem = module->mem;
if (!module->acc_single_intrinsic) {
ret = start_atomicity(module, target, &lock_acquired);
/* Start atomicity by acquiring acc lock */
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
if (ret != OMPI_SUCCESS) {
return ret;
}
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);

ompi_datatype_type_size(dt, &dt_bytes);
uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes);
Expand All @@ -823,7 +761,7 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
return ret;
}

return end_atomicity(module, target, lock_acquired, NULL);
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
}

int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
Expand All @@ -842,8 +780,6 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);

ompi_datatype_type_size(dt, &dt_bytes);
if (ompi_osc_base_is_atomic_size_supported(remote_addr, dt_bytes)) {
// fast path using UCX atomic operations
Expand All @@ -854,11 +790,14 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a

/* fall back to get-compare-put */

ret = start_atomicity(module, target, &lock_acquired);
/* Start atomicity by acquiring acc lock */
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
if (ret != OMPI_SUCCESS) {
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);

ret = opal_common_ucx_wpmem_putget(mem, OPAL_COMMON_UCX_GET, target,
result_addr, dt_bytes, remote_addr);
if (OPAL_SUCCESS != ret) {
Expand All @@ -881,7 +820,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
}
}

return end_atomicity(module, target, lock_acquired, NULL);
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
}

int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
Expand All @@ -907,13 +846,14 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
bool lock_acquired = false;

if (!module->acc_single_intrinsic) {
ret = start_atomicity(module, target, &lock_acquired);
/* Start atomicity by acquiring acc lock */
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
if (ret != OMPI_SUCCESS) {
return ret;
}
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);

value = origin_addr ? opal_common_ucx_load_uint64(origin_addr, dt_bytes) : 0;

Expand All @@ -934,7 +874,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
return ret;
}

return end_atomicity(module, target, lock_acquired, NULL);
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
} else {
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
target, target_disp, 1, dt, op, win);
Expand Down Expand Up @@ -970,13 +910,14 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
target_disp, result_addr, ucx_req);
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);

ret = start_atomicity(module, target, &lock_acquired);
/* Start atomicity by acquiring acc lock */
ret = ompi_osc_state_lock(module, target, &lock_acquired, false);
if (ret != OMPI_SUCCESS) {
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, !lock_acquired);

ret = ompi_osc_ucx_get(result_addr, result_count, result_dt, target,
target_disp, target_count, target_dt, win);
if (ret != OMPI_SUCCESS) {
Expand Down Expand Up @@ -1087,7 +1028,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
}


return end_atomicity(module, target, lock_acquired, free_addr);
return ompi_osc_state_unlock(module, target, lock_acquired, free_addr);
}

int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
Expand Down Expand Up @@ -1119,7 +1060,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);

OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
assert(NULL != ucx_req);
Expand Down Expand Up @@ -1175,7 +1116,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
return ret;
}

CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, true);

OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
assert(NULL != ucx_req);
Expand Down
Loading