Skip to content

Commit d9becec

Browse files
author
Mamzi Bayatpour mbayatpour@nvidia.com ()
committed
OSC/UCX: Adding locks to win attach/deattach and fixing build warnings
Signed-off-by: Mamzi Bayatpour <mbayatpour@nvidia.com>
1 parent 6233e0d commit d9becec

File tree

3 files changed

+169
-127
lines changed

3 files changed

+169
-127
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,5 +228,10 @@ int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win);
228228
int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_wins,
229229
int min_index, int max_index,
230230
uint64_t base, size_t len, int *insert);
231+
extern inline bool ompi_osc_need_acc_lock(ompi_osc_ucx_module_t *module, int target);
232+
extern inline int ompi_osc_state_lock(ompi_osc_ucx_module_t *module, int target,
233+
bool *lock_acquired);
234+
extern inline int ompi_osc_state_unlock(ompi_osc_ucx_module_t *module, int target,
235+
bool lock_acquired, void *free_ptr);
231236

232237
#endif /* OMPI_OSC_UCX_H */

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 54 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
return OMPI_ERROR; \
2828
}
2929

30-
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret) \
30+
#define CHECK_DYNAMIC_WIN(_remote_addr, _module, _target, _ret, _win) \
3131
if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
32-
_ret = get_dynamic_win_info(_remote_addr, _module, _target); \
32+
_ret = get_dynamic_win_info(_remote_addr, _win, _target); \
3333
if (_ret != OMPI_SUCCESS) { \
3434
return _ret; \
3535
} \
@@ -251,89 +251,9 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
251251
return ret;
252252
}
253253

254-
static inline bool need_acc_lock(ompi_osc_ucx_module_t *module, int target)
255-
{
256-
ompi_osc_ucx_lock_t *lock = NULL;
257-
opal_hash_table_get_value_uint32(&module->outstanding_locks,
258-
(uint32_t) target, (void **) &lock);
259-
260-
/* if there is an exclusive lock there is no need to acqurie the accumulate lock */
261-
return !(NULL != lock && LOCK_EXCLUSIVE == lock->type);
262-
}
263-
264-
static inline int start_atomicity(
265-
ompi_osc_ucx_module_t *module,
266-
int target,
267-
bool *lock_acquired) {
268-
uint64_t result_value = -1;
269-
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
270-
int ret = OMPI_SUCCESS;
271-
272-
if (need_acc_lock(module, target)) {
273-
for (;;) {
274-
ret = opal_common_ucx_wpmem_cmpswp(module->state_mem,
275-
TARGET_LOCK_UNLOCKED, TARGET_LOCK_EXCLUSIVE,
276-
target, &result_value, sizeof(result_value),
277-
remote_addr);
278-
if (ret != OMPI_SUCCESS) {
279-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_cmpswp failed: %d", ret);
280-
return OMPI_ERROR;
281-
}
282-
if (result_value == TARGET_LOCK_UNLOCKED) {
283-
break;
284-
}
285-
286-
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
287-
}
288-
289-
*lock_acquired = true;
290-
} else {
291-
*lock_acquired = false;
292-
}
293-
294-
return OMPI_SUCCESS;
295-
}
296-
297-
static inline int end_atomicity(
298-
ompi_osc_ucx_module_t *module,
299-
int target,
300-
bool lock_acquired,
301-
void *free_ptr) {
302-
uint64_t remote_addr = (module->state_addrs)[target] + OSC_UCX_STATE_ACC_LOCK_OFFSET;
303-
int ret = OMPI_SUCCESS;
304-
305-
if (lock_acquired) {
306-
uint64_t result_value = 0;
307-
/* fence any still active operations */
308-
ret = opal_common_ucx_wpmem_fence(module->mem);
309-
if (ret != OMPI_SUCCESS) {
310-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
311-
return OMPI_ERROR;
312-
}
313-
314-
ret = opal_common_ucx_wpmem_fetch(module->state_mem,
315-
UCP_ATOMIC_FETCH_OP_SWAP, TARGET_LOCK_UNLOCKED,
316-
target, &result_value, sizeof(result_value),
317-
remote_addr);
318-
assert(result_value == TARGET_LOCK_EXCLUSIVE);
319-
} else if (NULL != free_ptr){
320-
/* flush before freeing the buffer */
321-
ret = opal_common_ucx_ctx_flush(module->ctx, OPAL_COMMON_UCX_SCOPE_EP, target);
322-
}
323-
/* TODO: encapsulate in a request and make the release non-blocking */
324-
if (NULL != free_ptr) {
325-
free(free_ptr);
326-
}
327-
if (ret != OMPI_SUCCESS) {
328-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fetch failed: %d", ret);
329-
return OMPI_ERROR;
330-
}
331-
332-
return ret;
333-
}
334-
335-
static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module_t *module,
254+
static inline int get_dynamic_win_info(uint64_t remote_addr, struct ompi_win_t *win,
336255
int target) {
256+
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
337257
uint64_t remote_state_addr = (module->state_addrs)[target] + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET;
338258
size_t remote_state_len = sizeof(uint64_t) + sizeof(ompi_osc_dynamic_win_info_t) * OMPI_OSC_UCX_ATTACH_MAX;
339259
char *temp_buf = calloc(remote_state_len, 1);
@@ -343,6 +263,13 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
343263
int insert = -1;
344264
int ret;
345265

266+
bool lock_acquired = false;
267+
/* Start atomicity by acquiring acc lock */
268+
ret = ompi_osc_state_lock(module, target, &lock_acquired);
269+
if (ret != OMPI_SUCCESS) {
270+
return ret;
271+
}
272+
346273
ret = opal_common_ucx_wpmem_putget(module->state_mem, OPAL_COMMON_UCX_GET, target,
347274
(void *)((intptr_t)temp_buf),
348275
remote_state_len, remote_state_addr);
@@ -360,32 +287,34 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
360287

361288
memcpy(&win_count, temp_buf, sizeof(uint64_t));
362289
if (win_count > OMPI_OSC_UCX_ATTACH_MAX) {
363-
return MPI_ERR_RMA_RANGE;
290+
ret = MPI_ERR_RMA_RANGE;
291+
goto cleanup;
364292
}
365293

366294
temp_dynamic_wins = (ompi_osc_dynamic_win_info_t *)(temp_buf + sizeof(uint64_t));
367295
contain = ompi_osc_find_attached_region_position(temp_dynamic_wins, 0, win_count - 1,
368296
remote_addr, 1, &insert);
369-
if (contain < 0 || contain >= win_count) {
370-
return MPI_ERR_RMA_RANGE;
297+
if (contain < 0 || contain >= (int)win_count) {
298+
ret = MPI_ERR_RMA_RANGE;
299+
goto cleanup;
371300
}
372301

373302
assert(module->mem != NULL);
374303

375304
_mem_record_t *mem_rec = NULL;
376305
ret = opal_tsd_tracked_key_get(&module->mem->tls_key, (void **) &mem_rec);
377306
if (OPAL_SUCCESS != ret) {
378-
return ret;
307+
goto cleanup;
379308
}
380309

381310
if (mem_rec == NULL) {
382311
ret = opal_common_ucx_tlocal_fetch_spath(module->mem, target);
383312
if (OPAL_SUCCESS != ret) {
384-
return ret;
313+
goto cleanup;
385314
}
386315
ret = opal_tsd_tracked_key_get(&module->mem->tls_key, (void **) &mem_rec);
387316
if (OPAL_SUCCESS != ret) {
388-
return ret;
317+
goto cleanup;
389318
}
390319

391320
}
@@ -408,12 +337,15 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
408337

409338
if (ret != UCS_OK) {
410339
MCA_COMMON_UCX_VERBOSE(1, "ucp_ep_rkey_unpack failed: %d", ret);
411-
return OPAL_ERROR;
340+
ret = OPAL_ERROR;
341+
goto cleanup;
412342
}
413343

414344
cleanup:
415345
free(temp_buf);
416346

347+
ompi_osc_state_unlock(module, target, lock_acquired, NULL);
348+
417349
return ret;
418350
}
419351

@@ -477,7 +409,8 @@ static int do_atomic_op_intrinsic(
477409
struct ompi_datatype_t *dt,
478410
ptrdiff_t target_disp,
479411
void *result_addr,
480-
ompi_osc_ucx_request_t *ucx_req)
412+
ompi_osc_ucx_request_t *ucx_req,
413+
ompi_win_t *win)
481414
{
482415
int ret = OMPI_SUCCESS;
483416
size_t origin_dt_bytes;
@@ -486,7 +419,7 @@ static int do_atomic_op_intrinsic(
486419

487420
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
488421

489-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
422+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
490423

491424
ucp_atomic_fetch_op_t opcode;
492425
bool is_no_op = false;
@@ -555,7 +488,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
555488
return ret;
556489
}
557490

558-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
491+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
559492

560493
if (!target_count) {
561494
return OMPI_SUCCESS;
@@ -605,7 +538,7 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
605538
return ret;
606539
}
607540

608-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
541+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
609542

610543
if (!target_count) {
611544
return OMPI_SUCCESS;
@@ -670,12 +603,13 @@ int accumulate_req(const void *origin_addr, int origin_count,
670603
if (use_atomic_op(module, op, target_disp, origin_dt, target_dt, origin_count, target_count)) {
671604
return do_atomic_op_intrinsic(module, op, target,
672605
origin_addr, origin_count, origin_dt,
673-
target_disp, NULL, ucx_req);
606+
target_disp, NULL, ucx_req, win);
674607
}
675608

676-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
609+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
677610

678-
ret = start_atomicity(module, target, &lock_acquired);
611+
/* Start atomicity by acquiring acc lock */
612+
ret = ompi_osc_state_lock(module, target, &lock_acquired);
679613
if (ret != OMPI_SUCCESS) {
680614
return ret;
681615
}
@@ -781,7 +715,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
781715
ompi_request_complete(&ucx_req->super, true);
782716
}
783717

784-
return end_atomicity(module, target, lock_acquired, free_ptr);
718+
return ompi_osc_state_unlock(module, target, lock_acquired, free_ptr);
785719
}
786720

787721
int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
@@ -797,20 +731,21 @@ static int
797731
do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
798732
void *result_addr, struct ompi_datatype_t *dt,
799733
int target, uint64_t remote_addr,
800-
ompi_osc_ucx_module_t *module)
734+
ompi_osc_ucx_module_t *module, ompi_win_t *win)
801735
{
802736
int ret;
803737
bool lock_acquired = false;
804738
size_t dt_bytes;
805739
opal_common_ucx_wpmem_t *mem = module->mem;
806740
if (!module->acc_single_intrinsic) {
807-
ret = start_atomicity(module, target, &lock_acquired);
741+
/* Start atomicity by acquiring acc lock */
742+
ret = ompi_osc_state_lock(module, target, &lock_acquired);
808743
if (ret != OMPI_SUCCESS) {
809744
return ret;
810745
}
811746
}
812747

813-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
748+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
814749

815750
ompi_datatype_type_size(dt, &dt_bytes);
816751
uint64_t compare_val = opal_common_ucx_load_uint64(compare_addr, dt_bytes);
@@ -823,7 +758,7 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
823758
return ret;
824759
}
825760

826-
return end_atomicity(module, target, lock_acquired, NULL);
761+
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
827762
}
828763

829764
int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_addr,
@@ -842,19 +777,20 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
842777
return ret;
843778
}
844779

845-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
780+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
846781

847782
ompi_datatype_type_size(dt, &dt_bytes);
848783
if (ompi_osc_base_is_atomic_size_supported(remote_addr, dt_bytes)) {
849784
// fast path using UCX atomic operations
850785
return do_atomic_compare_and_swap(origin_addr, compare_addr,
851786
result_addr, dt, target,
852-
remote_addr, module);
787+
remote_addr, module, win);
853788
}
854789

855790
/* fall back to get-compare-put */
856791

857-
ret = start_atomicity(module, target, &lock_acquired);
792+
/* Start atomicity by acquiring acc lock */
793+
ret = ompi_osc_state_lock(module, target, &lock_acquired);
858794
if (ret != OMPI_SUCCESS) {
859795
return ret;
860796
}
@@ -881,7 +817,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
881817
}
882818
}
883819

884-
return end_atomicity(module, target, lock_acquired, NULL);
820+
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
885821
}
886822

887823
int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
@@ -907,13 +843,14 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
907843
bool lock_acquired = false;
908844

909845
if (!module->acc_single_intrinsic) {
910-
ret = start_atomicity(module, target, &lock_acquired);
846+
/* Start atomicity by acquiring acc lock */
847+
ret = ompi_osc_state_lock(module, target, &lock_acquired);
911848
if (ret != OMPI_SUCCESS) {
912849
return ret;
913850
}
914851
}
915852

916-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
853+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
917854

918855
value = origin_addr ? opal_common_ucx_load_uint64(origin_addr, dt_bytes) : 0;
919856

@@ -934,7 +871,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
934871
return ret;
935872
}
936873

937-
return end_atomicity(module, target, lock_acquired, NULL);
874+
return ompi_osc_state_unlock(module, target, lock_acquired, NULL);
938875
} else {
939876
return ompi_osc_ucx_get_accumulate(origin_addr, 1, dt, result_addr, 1, dt,
940877
target, target_disp, 1, dt, op, win);
@@ -967,12 +904,13 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
967904
if (use_atomic_op(module, op, target_disp, origin_dt, target_dt, origin_count, target_count)) {
968905
return do_atomic_op_intrinsic(module, op, target,
969906
origin_addr, origin_count, origin_dt,
970-
target_disp, result_addr, ucx_req);
907+
target_disp, result_addr, ucx_req, win);
971908
}
972909

973-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
910+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
974911

975-
ret = start_atomicity(module, target, &lock_acquired);
912+
/* Start atomicity by acquiring acc lock */
913+
ret = ompi_osc_state_lock(module, target, &lock_acquired);
976914
if (ret != OMPI_SUCCESS) {
977915
return ret;
978916
}
@@ -1087,7 +1025,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10871025
}
10881026

10891027

1090-
return end_atomicity(module, target, lock_acquired, free_addr);
1028+
return ompi_osc_state_unlock(module, target, lock_acquired, free_addr);
10911029
}
10921030

10931031
int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
@@ -1119,7 +1057,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11191057
return ret;
11201058
}
11211059

1122-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
1060+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
11231061

11241062
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
11251063
assert(NULL != ucx_req);
@@ -1175,7 +1113,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11751113
return ret;
11761114
}
11771115

1178-
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret);
1116+
CHECK_DYNAMIC_WIN(remote_addr, module, target, ret, win);
11791117

11801118
OMPI_OSC_UCX_REQUEST_ALLOC(win, ucx_req);
11811119
assert(NULL != ucx_req);

0 commit comments

Comments
 (0)