Skip to content

per-file thread safety in ROMIO  #6951

Closed
@wkliao

Description

@wkliao

I notice the thread lock and unlock calls are commented out for file open in the ROMIO driver, as shown below.

// OPAL_THREAD_LOCK (&mca_io_romio321_mutex);
ret = ROMIO_PREFIX(MPI_File_open)(comm, filename, amode, ompi_info,
&data->romio_fh);
// OPAL_THREAD_UNLOCK (&mca_io_romio321_mutex);

In order to support per-file thread safety, these lock and unlock calls must be uncommented, as ROMIO internally uses a few global variables need to be protected by a mutex.

I tested a short program below that lets each process to spawns 6 threads and each thread uses MPI_COMM_SELF to open a file. Without the lock protection, the test program failed.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <mpi.h>

#define NTHREADS 6

#define ERR \
    if (err != MPI_SUCCESS) { \
        int errorStringLen; \
        char errorString[MPI_MAX_ERROR_STRING]; \
        MPI_Error_string(err, errorString, &errorStringLen); \
        printf("Error at line %d: %s\n",__LINE__, errorString); \
        nerrs++; \
    }

/* pthread barrier object */
static pthread_barrier_t barr;

typedef struct {
    int  id;         /* globally unique thread ID */
    char fname[256]; /* output file name base */
} thread_arg;

/*----< thread_func() >------------------------------------------------------*/
static
void* thread_func(void *arg)
{
    char filename[256];
    int id, err, nerrs=0, *ret;
    MPI_File fh;

    /* make a unique file name for each thread */
    id = ((thread_arg*)arg)->id;
    sprintf(filename, "%s.%d", ((thread_arg*)arg)->fname, id);

    err = MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_CREATE | MPI_MODE_RDWR,
                        MPI_INFO_NULL, &fh); ERR

    err = MPI_File_close(&fh); ERR

    /* return number of errors encountered */
    ret = (int*)malloc(sizeof(int));
    *ret = nerrs;

    return ret; /* same as pthread_exit(ret); */
}

/*----< main() >-------------------------------------------------------------*/
int main(int argc, char **argv) {
    extern int optind;
    char filename[256];
    int  i, nerrs=0, rank, providedT;
    pthread_t threads[NTHREADS];

    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &providedT);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (providedT != MPI_THREAD_MULTIPLE) {
        if (!rank)
            printf("Error: MPI does not support MPI_THREAD_MULTIPLE\n");
        MPI_Finalize();
        return 0;
    }

    if (argc == 1) strcpy(filename, "testfile");
    else           strcpy(filename, argv[1]);

    /* initialize thread barrier */
    pthread_barrier_init(&barr, NULL, NTHREADS);

    /* create threads, each calls thread_func() */
    for (i=0; i<NTHREADS; i++) {
        thread_arg t_arg[NTHREADS]; /* must be unique to each thread */
        t_arg[i].id = i + rank * NTHREADS;
        sprintf(t_arg[i].fname, "%s",filename);
        if (pthread_create(&threads[i], NULL, thread_func, &t_arg[i])) {
            fprintf(stderr, "Error in %s line %d creating thread %d\n",
                    __FILE__, __LINE__, i);
            nerrs++;
        }
    }

    /* wait for all threads to finish */
    for (i=0; i<NTHREADS; i++) {
        void *ret;
        if (pthread_join(threads[i], (void**)&ret)) {
            fprintf(stderr, "Error in %s line %d joining thread %d\n",
                    __FILE__, __LINE__, i);
        }
        nerrs += *(int*)ret;
        free(ret);
    }

    pthread_barrier_destroy(&barr);

    MPI_Finalize();
    return (nerrs > 0);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions