Skip to content

Commit b5ab80c

Browse files
samuelkgutierrezrhc54
authored andcommitted
gds/shmem: Support multiple local clients.
Tested on a single node, multiple clients now appear to share static job-level information via shared-memory. We still have plenty of work ahead, but at least it doesn't crash now. We now cache a pointer to a full-featured gds module and pass certain operations to it. Signed-off-by: Samuel K. Gutierrez <samuel@lanl.gov>
1 parent 43d3345 commit b5ab80c

File tree

4 files changed

+69
-19
lines changed

4 files changed

+69
-19
lines changed

src/mca/gds/shmem/gds_shmem.c

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@
5959
// TODO(skg) Address FT case at some point. Need to have a broader conversion
6060
// about how we go about doing this. Ralph has some ideas.
6161

62+
// TODO(skg) Consider using mprotect.
63+
// TODO(skg) We way need to implement a different hash table.
64+
6265
/**
6366
* Key names used to find shared-memory segment info.
6467
*/
@@ -270,6 +273,7 @@ job_construct(
270273
) {
271274
job->nspace_id = NULL;
272275
job->nspace = NULL;
276+
job->ffgds = NULL;
273277
job->shmem = PMIX_NEW(pmix_shmem_t);
274278
job->smdata = NULL;
275279
}
@@ -284,6 +288,7 @@ job_destruct(
284288
if (job->nspace) {
285289
PMIX_RELEASE(job->nspace);
286290
}
291+
job->ffgds = NULL;
287292
// This will release the memory for the structures located in the
288293
// shared-memory segment.
289294
if (job->shmem) {
@@ -959,6 +964,13 @@ store_job_info(
959964
// Update the TMA to point to its local function pointers.
960965
tma_init_function_pointers(&job->smdata->tma);
961966
pmix_gds_shmem_vout_smdata(job);
967+
// Proctect memory: clients can only read from here.
968+
#if 0
969+
mprotect(
970+
job->shmem->base_address,
971+
job->shmem->size, PROT_READ
972+
);
973+
#endif
962974
// Done. Before this point the server should have populated the
963975
// shared-memory segment with the relevant data.
964976
return rc;
@@ -970,9 +982,18 @@ store(
970982
pmix_scope_t scope,
971983
pmix_kval_t *kval
972984
) {
973-
PMIX_HIDE_UNUSED_PARAMS(proc, scope, kval);
974985
PMIX_GDS_SHMEM_VOUT_HERE();
975-
return PMIX_ERR_NOT_SUPPORTED;
986+
987+
pmix_status_t rc = PMIX_SUCCESS;
988+
// Setup a job tracker for this peer's nspace.
989+
pmix_gds_shmem_job_t *job;
990+
rc = pmix_gds_shmem_get_job_tracker(proc->nspace, false, &job);
991+
if (PMIX_SUCCESS != rc) {
992+
PMIX_ERROR_LOG(rc);
993+
return rc;
994+
}
995+
// Let a full-featured gds module handle this.
996+
return job->ffgds->store(proc, scope, kval);
976997
}
977998

978999
/**
@@ -982,13 +1003,23 @@ store(
9821003
*/
9831004
static pmix_status_t
9841005
store_modex(
985-
struct pmix_namespace_t *nspace,
1006+
struct pmix_namespace_t *nspace_struct,
9861007
pmix_buffer_t *buff,
9871008
void *cbdata
9881009
) {
989-
PMIX_HIDE_UNUSED_PARAMS(nspace, buff, cbdata);
9901010
PMIX_GDS_SHMEM_VOUT_HERE();
991-
return PMIX_ERR_NOT_SUPPORTED;
1011+
1012+
pmix_status_t rc = PMIX_SUCCESS;
1013+
pmix_namespace_t *nspace = (pmix_namespace_t *)nspace_struct;
1014+
// Setup a job tracker for this peer's nspace.
1015+
pmix_gds_shmem_job_t *job;
1016+
rc = pmix_gds_shmem_get_job_tracker(nspace->nspace, false, &job);
1017+
if (PMIX_SUCCESS != rc) {
1018+
PMIX_ERROR_LOG(rc);
1019+
return rc;
1020+
}
1021+
// Let a full-featured gds module handle this.
1022+
return job->ffgds->store_modex(nspace_struct, buff, cbdata);
9921023
}
9931024

9941025
static pmix_status_t
@@ -998,6 +1029,7 @@ server_setup_fork(
9981029
) {
9991030
PMIX_HIDE_UNUSED_PARAMS(peer, env);
10001031
PMIX_GDS_SHMEM_VOUT_HERE();
1032+
// Nothing to do here.
10011033
return PMIX_SUCCESS;
10021034
}
10031035

@@ -1010,9 +1042,11 @@ server_add_nspace(
10101042
) {
10111043
PMIX_HIDE_UNUSED_PARAMS(nspace, nlocalprocs, info, ninfo);
10121044
PMIX_GDS_SHMEM_VOUT_HERE();
1045+
// Nothing to do here.
10131046
return PMIX_SUCCESS;
10141047
}
10151048

1049+
// TODO(skg) Implement.
10161050
static pmix_status_t
10171051
del_nspace(
10181052
const char *nspace
@@ -1029,18 +1063,37 @@ assemb_kvs_req(
10291063
pmix_buffer_t *buff,
10301064
void *cbdata
10311065
) {
1032-
PMIX_HIDE_UNUSED_PARAMS(proc, kvs, buff, cbdata);
10331066
PMIX_GDS_SHMEM_VOUT_HERE();
1034-
return PMIX_ERR_NOT_SUPPORTED;
1067+
1068+
pmix_status_t rc = PMIX_SUCCESS;
1069+
// Setup a job tracker for this peer's nspace.
1070+
pmix_gds_shmem_job_t *job;
1071+
rc = pmix_gds_shmem_get_job_tracker(proc->nspace, false, &job);
1072+
if (PMIX_SUCCESS != rc) {
1073+
PMIX_ERROR_LOG(rc);
1074+
return rc;
1075+
}
1076+
// Let a full-featured gds module handle this.
1077+
return job->ffgds->assemb_kvs_req(proc, kvs, buff, cbdata);
10351078
}
10361079

10371080
static pmix_status_t
10381081
accept_kvs_resp(
10391082
pmix_buffer_t *buff
10401083
) {
1041-
PMIX_HIDE_UNUSED_PARAMS(buff);
10421084
PMIX_GDS_SHMEM_VOUT_HERE();
1043-
return PMIX_SUCCESS;
1085+
1086+
pmix_status_t rc = PMIX_SUCCESS;
1087+
const char *nspace = pmix_globals.mypeer->nptr->nspace;
1088+
// Setup a job tracker for this peer's nspace.
1089+
pmix_gds_shmem_job_t *job;
1090+
rc = pmix_gds_shmem_get_job_tracker(nspace, false, &job);
1091+
if (PMIX_SUCCESS != rc) {
1092+
PMIX_ERROR_LOG(rc);
1093+
return rc;
1094+
}
1095+
// Let a full-featured gds module handle this.
1096+
return job->ffgds->accept_kvs_resp(buff);
10441097
}
10451098

10461099
pmix_gds_base_module_t pmix_shmem_module = {

src/mca/gds/shmem/gds_shmem.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ typedef struct {
124124
char *nspace_id;
125125
/** Pointer to the namespace. */
126126
pmix_namespace_t *nspace;
127+
/** Pointer to a full-featured gds module. */
128+
pmix_gds_base_module_t *ffgds;
127129
/** Shared-memory object. */
128130
pmix_shmem_t *shmem;
129131
/** Points to shared data located in shared-memory segment. */

src/mca/gds/shmem/gds_shmem_fetch.c

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -509,13 +509,6 @@ pmix_gds_shmem_fetch(
509509
}
510510
pmix_hash_table2_t *ht = job->smdata->local_hashtab;
511511

512-
// TODO(skg) Cache during init.
513-
pmix_info_t ginfo;
514-
PMIX_INFO_LOAD(&ginfo, PMIX_GDS_MODULE, "hash", PMIX_STRING);
515-
pmix_gds_base_module_t *hashmod = pmix_gds_base_assign_module(&ginfo, 1);
516-
assert(hashmod);
517-
PMIX_INFO_DESTRUCT(&ginfo);
518-
519512
if (NULL == key && PMIX_RANK_WILDCARD == proc->rank) {
520513
assert(false);
521514
return PMIX_ERR_NOT_SUPPORTED;
@@ -524,7 +517,7 @@ pmix_gds_shmem_fetch(
524517
for (size_t n = 0; n < nqual; n++) {
525518
if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_SESSION_INFO)) {
526519
// We don't handle session info, so pass it along.
527-
return hashmod->fetch(
520+
return job->ffgds->fetch(
528521
proc, scope, copy, key, qualifiers, nqual, kvs
529522
);
530523
}
@@ -606,14 +599,14 @@ pmix_gds_shmem_fetch(
606599
rc = pmix_hash2_fetch(ht, PMIX_RANK_WILDCARD, NULL, NULL, 0, kvs);
607600
}
608601
else {
609-
return hashmod->fetch(proc, scope, copy, key, qualifiers, nqual, kvs);
602+
return job->ffgds->fetch(proc, scope, copy, key, qualifiers, nqual, kvs);
610603
}
611604
}
612605
else {
613606
rc = pmix_hash2_fetch(ht, proc->rank, key, qualifiers, nqual, kvs);
614607
}
615608
if (PMIX_SUCCESS != rc) {
616-
rc = hashmod->fetch(proc, scope, copy, key, qualifiers, nqual, kvs);
609+
rc = job->ffgds->fetch(proc, scope, copy, key, qualifiers, nqual, kvs);
617610
}
618611

619612
return rc;

src/mca/gds/shmem/gds_shmem_utils.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ pmix_gds_shmem_get_job_tracker(
100100
}
101101
PMIX_RETAIN(inspace);
102102
target_tracker->nspace = inspace;
103+
// Cache a handle to a full-featured gds module.
104+
target_tracker->ffgds = pmix_globals.mypeer->nptr->compat.gds;
103105
// Add it to the list of jobs I'm supporting.
104106
pmix_list_append(&component->jobs, &target_tracker->super);
105107
}

0 commit comments

Comments
 (0)