-
Notifications
You must be signed in to change notification settings - Fork 914
Closed
Description
Thank you for taking the time to submit an issue!
Background information
What version of Open MPI are you using? (e.g., v4.1.6, v5.0.1, git branch name and hash, etc.)
v4.1.2
pmix v3.2.2
Describe how Open MPI was installed (e.g., from a source/distribution tarball, from a git clone, from an operating system distribution package, etc.)
from a source
./configure
make & make install
If you are building/installing from a git clone, please copy-n-paste the output from git submodule status
.
Please describe the system on which you are running
- Operating system/version: ubuntu 24
- Computer hardware:
- Network type:
Details of the problem
I customized a PMIx server, and when I ran an MPI program, I noticed that it started the built-in PMIx server and overwrote my previous environment variables. How can I make the MPI program connect to my server instead of using the built-in server? What do I need to implement?
PMIX_MCA_psec=native
PMIX_NAMESPACE=nspace
PMIX_RANK=0
PMIX_SERVER_URI3=pmix-server.194449;tcp4://127.0.0.1:34129
PMIX_SERVER_URI2=pmix-server.194449;tcp4://127.0.0.1:34129
PMIX_SERVER_URI21=pmix-server.194449;tcp4://127.0.0.1:34129
PMIX_PTL_MODULE=tcp,usock
PMIX_BFROP_BUFFER_TYPE=PMIX_BFROP_BUFFER_NON_DESC
PMIX_GDS_MODULE=ds21,ds12,hash
PMIX_SERVER_TMPDIR=/tmp/pmix.1000
PMIX_SYSTEM_TMPDIR=/tmp
PMIX_DSTORE_21_BASE_PATH=/tmp/pmix.1000/pmix_dstor_ds21_194449
PMIX_DSTORE_ESH_BASE_PATH=/tmp/pmix.1000/pmix_dstor_ds12_194449
PMIX_HOSTNAME=huerni-VMware-Virtual-Platform
PMIX_VERSION=3.2.2
The PMIx environment variables are overwritten as follows:
PMIX_MCA_psec=native
PMIX_NAMESPACE=981204993
PMIX_RANK=0
PMIX_SERVER_URI3=981204992.0;tcp4://127.0.0.1:59699
PMIX_SERVER_URI2=981204992.0;tcp4://127.0.0.1:59699
PMIX_SERVER_URI21=981204992.0;tcp4://127.0.0.1:59699
PMIX_PTL_MODULE=tcp,usock
PMIX_BFROP_BUFFER_TYPE=PMIX_BFROP_BUFFER_NON_DESC
PMIX_GDS_MODULE=ds21,ds12,hash
PMIX_SERVER_TMPDIR=/tmp/ompi.huerni-VMware-Virtual-Platform.1000/pid.194453
PMIX_SYSTEM_TMPDIR=/tmp
PMIX_DSTORE_21_BASE_PATH=/tmp/ompi.huerni-VMware-Virtual-Platform.1000/pid.194453/pmix_dstor_ds21_194453
PMIX_DSTORE_ESH_BASE_PATH=/tmp/ompi.huerni-VMware-Virtual-Platform.1000/pid.194453/pmix_dstor_ds12_194453
PMIX_HOSTNAME=huerni-VMware-Virtual-Platform
PMIX_VERSION=3.2.2
IPATH_NO_BACKTRACE=1
HFI_NO_BACKTRACE=1
ORTE_SCHIZO_DETECTION=ORTE
OMPI_MCA_orte_launch=1
OMPI_MCA_orte_precondition_transports=d442461a042fac6d-45ce644be84df18b
PMIX_MCA_mca_base_component_show_load_errors=1
PMIX_DEBUG=100
PMIX_MCA_ptl=tcp,usock
PMIX_MCA_gds=ds21,ds12,hash
OMPI_MCA_orte_ess_num_procs=1
OMPI_APP_CTX_NUM_PROCS=1
The code implementation is as follows:
#include <absl/strings/str_split.h>
#include <absl/synchronization/blocking_counter.h>
#include <cstdint>
#include <pmix.h>
#include <pmix_common.h>
#include <pmix_server.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstdio>
#include <cstdlib>
#include <string>
#include <thread>
#include <vector>
#include <stdlib.h>
#include <sys/stat.h>
static volatile int wakeup;
static void op_callbk(pmix_status_t status, void *cbdata) {
auto *bc = reinterpret_cast<absl::BlockingCounter *>(cbdata);
bc->DecrementCount();
printf("op callback is called with status=%d: %s\n", status,
PMIx_Error_string(status));
}
static void app_cb(pmix_status_t status, pmix_info_t info[], size_t ninfo,
void *provided_cbdata, pmix_op_cbfunc_t cbfunc,
void *cbdata) {
auto *bc = reinterpret_cast<absl::BlockingCounter *>(provided_cbdata);
if (PMIX_SUCCESS == status && ninfo > 0) {
for (int i = 0; i<ninfo; i++) {
printf("%s \n", info[i].key);
}
}
bc->DecrementCount();
printf("app callback is called with status=%d: %s\n", status,
PMIx_Error_string(status));
}
class PMIxServerModule {
public:
static int client_connected(const pmix_proc_t *proc, void *server_object,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
/* we don't do anything by now */
printf("SERVER: connected %s:%d\n", proc->nspace, proc->rank);
// if (NULL != cbfunc) {
// cbfunc(PMIX_SUCCESS, cbdata);
// }
return PMIX_SUCCESS;
}
static void op_callbk(pmix_status_t status, void *cbdata) {
printf("op callback is called with status=%d: %s\n", status,
PMIx_Error_string(status));
}
static void errhandler_reg_callbk(pmix_status_t status, size_t errhandler_ref,
void *cbdata) {
printf(
"Error handler registration callback is called with status=%d, "
"ref=%d\n",
status, (int)errhandler_ref);
}
static pmix_status_t client_finalized(const pmix_proc_t *proc,
void *server_object,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
printf("client_finalized is called ");
/* don't do anything by now */
printf("SERVER: FINALIZED %s:%d\n", proc->nspace, proc->rank);
--wakeup;
/* ensure we call the cbfunc so the proc can exit! */
if (nullptr != cbfunc) {
cbfunc(PMIX_SUCCESS, cbdata);
}
return PMIX_SUCCESS;
}
static pmix_status_t abort_fn(const pmix_proc_t *pmix_proc,
void *server_object, int status,
const char msg[], pmix_proc_t pmix_procs[],
size_t nprocs, pmix_op_cbfunc_t cbfunc,
void *cbdata) {
printf("abort_fn called: status = %d, msg = %s\n", status, msg);
pmix_status_t rc;
if (NULL != pmix_procs) {
printf( "SERVER: ABORT on %s:%d", pmix_procs[0].nspace, pmix_procs[0].rank);
} else {
printf("SERVER: ABORT OF ALL PROCS IN NSPACE %s", pmix_procs->nspace);
}
// if (pmixp_lib_abort(status, cbfunc, cbdata) != SLURM_SUCCESS)
// return PMIX_ERROR;
return PMIX_SUCCESS;
}
static pmix_status_t fencenb_fn(const pmix_proc_t procs_v2[], size_t nprocs,
const pmix_info_t info[], size_t ninfo,
char *data, size_t ndata,
pmix_modex_cbfunc_t cbfunc, void *cbdata) {
printf(" fencenb_fn is called\n");
/* pass the provided data back to each participating proc */
if (NULL != cbfunc) {
cbfunc(PMIX_SUCCESS, data, ndata, cbdata, NULL, NULL);
}
return PMIX_SUCCESS;
}
static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
const pmix_info_t info[], size_t ninfo,
pmix_modex_cbfunc_t cbfunc, void *cbdata) {
printf("dmodex_fn is called\n");
return PMIX_SUCCESS;
}
static pmix_status_t job_control(const pmix_proc_t *proct,
const pmix_proc_t targets[], size_t ntargets,
const pmix_info_t directives[], size_t ndirs,
pmix_info_cbfunc_t cbfunc, void *cbdata) {
printf("job_control is called\n");
return PMIX_ERR_NOT_SUPPORTED;
}
static pmix_status_t publish_fn(const pmix_proc_t *proc,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
return PMIX_ERR_NOT_SUPPORTED;
}
static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
const pmix_info_t info[], size_t ninfo,
pmix_lookup_cbfunc_t cbfunc, void *cbdata) {
return PMIX_ERR_NOT_SUPPORTED;
}
static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
return PMIX_ERR_NOT_SUPPORTED;
}
static pmix_status_t spawn_fn(const pmix_proc_t *proc,
const pmix_info_t job_info[], size_t ninfo,
const pmix_app_t apps[], size_t napps,
pmix_spawn_cbfunc_t cbfunc, void *cbdata) {
return PMIX_ERR_NOT_SUPPORTED;
}
static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
return PMIX_ERR_NOT_SUPPORTED;
}
static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
const pmix_info_t info[], size_t ninfo,
pmix_op_cbfunc_t cbfunc, void *cbdata) {
return PMIX_ERR_NOT_SUPPORTED;
}
};
static pmix_server_module_t slurm_pmix_cb = {
.client_connected = PMIxServerModule::client_connected,
.client_finalized = PMIxServerModule::client_finalized,
.abort = PMIxServerModule::abort_fn,
.fence_nb = PMIxServerModule::fencenb_fn,
.direct_modex = PMIxServerModule::dmodex_fn,
.publish = PMIxServerModule::publish_fn,
.lookup = PMIxServerModule::lookup_fn,
.unpublish = PMIxServerModule::unpublish_fn,
.spawn = PMIxServerModule::spawn_fn,
.connect = PMIxServerModule::connect_fn,
.disconnect = PMIxServerModule::disconnect_fn,
.job_control = PMIxServerModule::job_control,
};
struct MpiJobInfo {
pmix_nspace_t nspace; // crane.pmix.jobid.stepid
uint32_t uid;
gid_t gid;
int node_id;
int node_id_job;
std::string hostname;
uint32_t node_tasks;
int timeout;
std::string cli_tmpdir;
std::string cli_tmpdir_base;
std::string server_tmpdir;
std::string client_lip_tmpdir;
std::string server_addr_unfmt;
std::string spool_dir;
std::string crun_ip;
int abort_agent_port;
};
static volatile int keep_running = 1;
void signal_handler(int sig) {
keep_running = 0;
printf("Received signal %d, shutting down PMIx server...\n", sig);
}
int main(int argc, char **argv) {
static char tmpdir[] = "/tmp/pmix.1000";
char **client_env = NULL;
char **client_argv = NULL;
char *tmp, **atmp, *executable = NULL, *cleanup;
int rc, nprocs = 1, n, k;
uid_t myuid;
gid_t mygid;
pid_t pid;
// myxfer_t *x;
pmix_proc_t proc;
// wait_tracker_t *child;
char *tdir;
uid_t uid = geteuid();
struct stat buf;
pmix_nspace_t ncache;
/* create the directory */
if (0 != stat(tmpdir, &buf)) {
/* try to make directory */
if (0 != mkdir(tmpdir, S_IRWXU)) {
fprintf(stderr, "Cannot make tmpdir %s", tmpdir);
exit(1);
}
}
asprintf(&cleanup, "rm -rf %s", tmpdir);
pmix_info_t *info;
PMIX_INFO_CREATE(info, 1);
PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TMPDIR, tmpdir, PMIX_STRING);
if (PMIX_SUCCESS != (rc = PMIx_server_init(&slurm_pmix_cb, info, 1))) {
fprintf(stderr, "Init failed with error %d\n", rc);
return rc;
}
PMIX_INFO_FREE(info, 1);
/* register the errhandler */
// PMIx_Register_event_handler(NULL, 0, NULL, 0, _errhandler, errhandler_reg_callbk1, NULL);
for (n = 1; n < (argc - 1); n++) {
if (0 == strcmp("-n", argv[n]) && NULL != argv[n + 1]) {
nprocs = strtol(argv[n + 1], NULL, 10);
++n; // step over the argument
} else if (0 == strcmp("-e", argv[n]) && NULL != argv[n + 1]) {
executable = strdup(argv[n + 1]);
for (k = n + 2; NULL != argv[k]; k++) {
pmix_argv_append_nosize(&client_argv, argv[k]);
}
n += k;
}
}
if (NULL == executable) {
executable = strdup("/home/huerni/PMIxTest/cmake-build-debug/MPITestProg/MPITestProg");
}
atmp = NULL;
for (n = 0; n < nprocs; n++) {
asprintf(&tmp, "%d", n);
pmix_argv_append_nosize(&atmp, tmp);
free(tmp);
}
tmp = pmix_argv_join(atmp, ',');
pmix_argv_free(atmp);
{
absl::BlockingCounter bc(1);
if (PMIX_SUCCESS
!= (rc = PMIx_server_setup_application("nspace", NULL, 0, app_cb, &bc))) {
fprintf(stderr, "Failed to setup application: %d\n", rc);
PMIx_server_finalize();
exit(1);
}
bc.Wait();
}
char hostname[100000];
gethostname(hostname, sizeof(hostname));
static char job_id[] = "100";
pmix_info_t *ns_info;
pmix_data_array_t *proc_data;
PMIX_INFO_CREATE(ns_info, 11+nprocs);
int m = 0;
PMIX_INFO_LOAD(&ns_info[m], PMIX_JOBID, job_id, PMIX_STRING);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_NODEID, 0, PMIX_UINT32);
m++;
for (n = 0; n<nprocs; n++) {
PMIX_DATA_ARRAY_CREATE(proc_data, 6, PMIX_INFO);
auto *proc_data_arr = reinterpret_cast<pmix_info_t *>(proc_data->array);
int zero = 0;
int rank = n;
PMIX_INFO_LOAD(&proc_data_arr[0], PMIX_RANK, &rank, PMIX_PROC_RANK);
PMIX_INFO_LOAD(&proc_data_arr[1], PMIX_GLOBAL_RANK, &rank, PMIX_UINT32);
PMIX_INFO_LOAD(&proc_data_arr[2], PMIX_NODE_RANK, &rank, PMIX_UINT16);
PMIX_INFO_LOAD(&proc_data_arr[3], PMIX_LOCAL_RANK, &rank, PMIX_UINT16);
PMIX_INFO_LOAD(&proc_data_arr[4], PMIX_HOSTNAME, hostname, PMIX_STRING);
PMIX_INFO_LOAD(&proc_data_arr[5], PMIX_NODEID, &zero, PMIX_UINT32);
PMIX_INFO_LOAD(&ns_info[m], PMIX_PROC_DATA, proc_data, PMIX_DATA_ARRAY);
m++;
PMIX_DATA_ARRAY_DESTRUCT(proc_data);
}
uint32_t val32 = 1;
// Job Size
PMIX_INFO_LOAD(&ns_info[m], PMIX_UNIV_SIZE, &nprocs, PMIX_UINT32);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_JOB_SIZE, &nprocs, PMIX_UINT32);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_LOCAL_SIZE, &nprocs, PMIX_UINT32);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_NODE_SIZE, &val32, PMIX_UINT32);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_MAX_PROCS, &nprocs, PMIX_UINT32);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_SPAWNED, 0, PMIX_UINT32);
m++;
// Local info
PMIX_INFO_LOAD(&ns_info[m], PMIX_LOCAL_PEERS, strdup(tmp), PMIX_STRING);
m++;
char *regex, *ppn;
rc = PMIx_generate_regex(hostname, ®ex);
if (rc != PMIX_SUCCESS) {
printf("Error: PMIx_generate_regex. %s\n", PMIx_Error_string(rc));
return 1;
}
PMIx_generate_ppn(tmp, &ppn);
PMIX_INFO_LOAD(&ns_info[m], PMIX_NODE_MAP, regex, PMIX_STRING);
m++;
PMIX_INFO_LOAD(&ns_info[m], PMIX_PROC_MAP, ppn, PMIX_STRING);
m++;
{
absl::BlockingCounter bc(1);
rc = PMIx_server_register_nspace("nspace", nprocs, ns_info, m, op_callbk, &bc);
PMIX_INFO_DESTRUCT(ns_info);
if (rc != PMIX_SUCCESS) {
printf("Error: PMIx_server_register_nspace. %s\n", PMIx_Error_string(rc));
return 1;
}
bc.Wait();
}
client_env = pmix_argv_copy(environ);
// pmix_argv_prepend_nosize(&client_argv, "");
pmix_argv_prepend_nosize(&client_argv, executable);
wakeup = nprocs;
myuid = getuid();
mygid = getgid();
free(tmp);
PMIX_LOAD_NSPACE(ncache, "nspace");
{
absl::BlockingCounter bc(1);
if (PMIX_SUCCESS != (rc = PMIx_server_setup_local_support(ncache, NULL, 0, op_callbk, &bc))) {
fprintf(stderr, "Setup local support failed: %d\n", rc);
PMIx_server_finalize();
system(cleanup);
return rc;
}
bc.Wait();
}
PMIX_LOAD_NSPACE(proc.nspace, ncache);
for (n = 0; n<nprocs; n++) {
proc.rank = n;
{
absl::BlockingCounter bc(1);
if (PMIX_SUCCESS
!= (rc = PMIx_server_register_client(&proc, myuid, mygid, NULL, op_callbk, &bc))) {
fprintf(stderr, "Server fork setup failed with error %d\n", rc);
PMIx_server_finalize();
system(cleanup);
return rc;
}
bc.Wait();
}
}
for (n = 0; n<nprocs; n++) {
proc.rank = n;
if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) { // n
fprintf(stderr, "Server fork setup failed with error %d\n", rc);
PMIx_server_finalize();
system(cleanup);
return rc;
}
pid = fork();
if (pid != 0) {
int status;
// wait(&status);
} else {
execve(executable, client_argv, client_env);
/* Does not return */
perror("execve failed"); // 输出具体错误(如路径错误、权限不足)
exit(EXIT_FAILURE);
}
}
// printf("Env after exec:\n");
// char** it = client_env;
// while(*it != nullptr) {
// printf("%s\n", *it);
// it++;
// }
free(executable);
pmix_argv_free(client_argv);
pmix_argv_free(client_env);
while (0 < wakeup) {
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000;
nanosleep(&ts, NULL);
}
{
absl::BlockingCounter bc(1);
PMIx_server_deregister_nspace("nspace", op_callbk, &bc);
bc.Wait();
}
{
absl::BlockingCounter bc(1);
PMIx_Deregister_event_handler(0, op_callbk, &bc);
bc.Wait();
}
fprintf(stderr, "Test finished OK!\n");
system(cleanup);
return rc;
}
Metadata
Metadata
Assignees
Labels
No labels