Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions src/axl/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# TODO: Separate PTHREADS from SOCKETS
IF(ENABLE_PTHREADS)
LIST(APPEND axl_optional_srcs axl_pthread.c)
LIST(APPEND axl_optional_srcs axl_pthread.c axl_socket.c)
ENDIF(ENABLE_PTHREADS)

IF(BBAPI_FOUND)
Expand All @@ -10,23 +11,9 @@ IF(HAVE_DATAWARP)
LIST(APPEND axl_optional_srcs axl_async_datawarp.c)
ENDIF(HAVE_DATAWARP)

LIST(APPEND libaxl_srcs
axl.c
axl_sync.c
axl_err.c
axl_io.c
axl_util.c
${axl_optional_srcs}
)

LIST(APPEND libaxl_serial_srcs
axl.c
axl_sync.c
axl_err.c
axl_io.c
axl_util.c
${axl_optional_srcs}
)
LIST(APPEND libaxl_srcs axl.c axl_sync.c axl_err.c axl_io.c axl_util.c ${axl_optional_srcs})

LIST(APPEND libaxl_serial_srcs axl.c axl_sync.c axl_err.c axl_io.c axl_util.c ${axl_optional_srcs})

IF(MPI_C_FOUND)
LIST(APPEND libaxl_srcs axl_mpi.c)
Expand All @@ -40,6 +27,14 @@ IF(BUILD_SHARED_LIBS)
SET_PROPERTY(TARGET axl_serial_o PROPERTY POSITION_INDEPENDENT_CODE 1)
ENDIF()

# TODO: Separate PTHREADS from SOCKETS
IF(HAVE_PTHREADS)
ADD_EXECUTABLE(axl_socket_daemon axl_socket_daemon.c)
TARGET_LINK_LIBRARIES(axl_socket_daemon axl_o kvtree_o ${SCR_EXTERNAL_LIBS})
INSTALL(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/axl_socket_daemon DESTINATION ${CMAKE_INSTALL_LIBEXECDIR})
ENDIF(HAVE_PTHREADS)


IF(ENABLE_TESTS)
ADD_SUBDIRECTORY(test)
ENDIF(ENABLE_TESTS)
103 changes: 71 additions & 32 deletions src/axl/axl.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
/* xfer methods */
#include "axl_sync.h"

/* (Optional) service functions */
#include "axl_socket.h"

#ifdef HAVE_PTHREADS
#include "axl_pthread.h"
#endif /* HAVE_PTHREAD */
Expand Down Expand Up @@ -75,18 +78,19 @@ int axl_copy_metadata;
/* global rank of calling process, used for BBAPI */
int axl_rank = -1;

/*
* If we are NOT running as a server, use axl_client_xfer_list to contain
* our list of transfer items fro AXL Create. Otherwise, the server will
* change the axl_xfer_list pointer for each connection it is servicing.
*/
static struct axl_transfer_array axl_client_xfer_list = {
.axl_kvtrees = NULL, .axl_kvtrees_count = 0
};
struct axl_transfer_array* axl_xfer_list = &axl_client_xfer_list;

/* reference count for number of times AXL_Init has been called */
static unsigned int axl_init_count = 0;

/* Array for all the AXL_Create'd kvtree pointers. It's indexed by the AXL id.
*
* Note: We only expand this array, we never shrink it. This is fine since
* the user is only going to be calling AXL_Create() a handful of times. It
* also simplifies the code if we never shrink it, and the extra memory usage
* is negligible, if any at all. */
kvtree** axl_kvtrees;
static unsigned int axl_kvtrees_count = 0;

#ifdef HAVE_BBAPI
static int bbapi_is_loaded = 0;
#endif
Expand All @@ -110,19 +114,19 @@ static int axl_alloc_id(const char* state_file)
kvtree_util_set_str(new, AXL_KEY_STATE_FILE, state_file);
}

int id = axl_kvtrees_count;
axl_kvtrees_count++;
int id = axl_xfer_list->axl_kvtrees_count;
axl_xfer_list->axl_kvtrees_count++;

axl_kvtrees = realloc(axl_kvtrees, sizeof(struct kvtree*) * axl_kvtrees_count);
axl_kvtrees[id] = new;
axl_xfer_list->axl_kvtrees = realloc(axl_xfer_list->axl_kvtrees, sizeof(struct kvtree*) * axl_xfer_list->axl_kvtrees_count);
axl_xfer_list->axl_kvtrees[id] = new;

return id;
}

/* Remove the state file for an id, if one exists */
static void axl_remove_state_file(int id)
{
kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
char* state_file = NULL;
if (kvtree_util_get_str(file_list, AXL_KEY_STATE_FILE,
&state_file) == KVTREE_SUCCESS)
Expand All @@ -136,15 +140,15 @@ static void axl_free_id(int id)
{
axl_remove_state_file(id);

/* kvtree_delete() will set axl_kvtrees[id] = NULL */
kvtree_delete(&axl_kvtrees[id]);
/* kvtree_delete() will set axl_xfer_list->axl_kvtrees[id] = NULL */
kvtree_delete(&axl_xfer_list->axl_kvtrees[id]);
}

/* If the user specified a state_file then write our kvtree to it. If not, then
* do nothing. */
void axl_write_state_file(int id)
{
kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
char* state_file = NULL;
if (kvtree_util_get_str(file_list, AXL_KEY_STATE_FILE,
&state_file) == KVTREE_SUCCESS)
Expand All @@ -163,7 +167,7 @@ static int axl_get_info(int id, kvtree** list, axl_xfer_t* type, axl_xfer_state_
*state = AXL_XFER_STATE_NULL;

/* lookup transfer info for the given id */
kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
if (file_list == NULL) {
AXL_ERR("Could not find fileset for UID %d", id);
return AXL_FAILURE;
Expand Down Expand Up @@ -260,6 +264,13 @@ int AXL_Init (void)
axl_make_directories = atoi(val);
}

/* If the user has set both the AXL_SERVICE_HOST and AXL_SERVICE_PORT
* environment variables, then they are expecting to use the AXL Service
* rather than the library included with the SCR library.
*/
char* axl_service_host = NULL;
int axl_service_port = -1;

/* initialize our flag on whether to first copy files to temporary names with extension */
axl_use_extension = 0;
val = getenv("AXL_USE_EXTENSION");
Expand All @@ -275,7 +286,26 @@ int AXL_Init (void)
}

/* keep a reference count to free memory on last AXL_Finalize */
axl_init_count++;
if (axl_init_count++ == 0) {
/*
* If we are not running as the AXL Server, check to see if we are
* expected to run as a client and then connect if so.
*/
if (axl_service_mode != AXL_SOCKET_SERVER) {
if ( (val = getenv("AXL_SERVICE_HOST")) != NULL) {
axl_service_host = strdup(val);

if ( (val = getenv("AXL_SERVICE_PORT")) != NULL) {
axl_service_port = atoi(val);

if (axl_socket_client_init(axl_service_host, (unsigned short)axl_service_port)) {
axl_service_mode = AXL_SOCKET_CLIENT;
}
}
free(axl_service_host);
}
}
}

return rc;
}
Expand All @@ -297,8 +327,13 @@ int AXL_Finalize (void)
axl_init_count--;
if (axl_init_count == 0) {
/* TODO: are there cases where we also need to delete trees? */
axl_free(&axl_kvtrees);
axl_kvtrees_count = 0;
axl_free(&axl_xfer_list->axl_kvtrees);
axl_xfer_list->axl_kvtrees_count = 0;

if (axl_service_mode == AXL_SOCKET_CLIENT) {
axl_socket_client_AXL_Finalize();
}

}

return rc;
Expand Down Expand Up @@ -360,7 +395,7 @@ static kvtree* AXL_Config_Set(const kvtree* config)
char* endptr;
long id = strtol(key, &endptr, 10);
if ((*key == '\0' || *endptr != '\0') ||
(id < 0 || id >= axl_kvtrees_count))
(id < 0 || id >= axl_xfer_list->axl_kvtrees_count))
{
retval = NULL;
break;
Expand All @@ -372,7 +407,7 @@ static kvtree* AXL_Config_Set(const kvtree* config)
break;
}

kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];

const char** opt;
for (opt = known_transfer_options; *opt != NULL; opt++) {
Expand Down Expand Up @@ -462,6 +497,10 @@ static kvtree* AXL_Config_Set(const kvtree* config)
}
}

if (axl_service_mode == AXL_SOCKET_CLIENT) {
axl_socket_client_AXL_Config_Set(config);
}

return retval;
}

Expand Down Expand Up @@ -502,8 +541,8 @@ static kvtree* AXL_Config_Get()

/* per transfer options */
int id;
for (id = 0; id < axl_kvtrees_count; id++) {
kvtree* file_list = axl_kvtrees[id];
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
if (file_list == NULL) {
/* TODO: check if it would be better to return an empty hash instead */
continue;
Expand Down Expand Up @@ -613,7 +652,7 @@ int AXL_Create(axl_xfer_t xtype, const char* name, const char* state_file)
return -1;
}

kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
kvtree_util_set_int(file_list, AXL_KEY_XFER_TYPE, xtype);
kvtree_util_set_str(file_list, AXL_KEY_UNAME, name);
if (!reload_from_state_file) {
Expand Down Expand Up @@ -1521,8 +1560,8 @@ int AXL_Stop ()

/* cancel each active id */
int id;
for (id = 0; id < axl_kvtrees_count; id++) {
if (!axl_kvtrees[id]) {
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
if (!axl_xfer_list->axl_kvtrees[id]) {
continue;
}

Expand All @@ -1532,8 +1571,8 @@ int AXL_Stop ()
}

/* wait */
for (id = 0; id < axl_kvtrees_count; id++) {
if (!axl_kvtrees[id]) {
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
if (!axl_xfer_list->axl_kvtrees[id]) {
continue;
}

Expand All @@ -1543,8 +1582,8 @@ int AXL_Stop ()
}

/* and free it */
for (id = 0; id < axl_kvtrees_count; id++) {
if (!axl_kvtrees[id]) {
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
if (!axl_xfer_list->axl_kvtrees[id]) {
continue;
}

Expand Down
17 changes: 13 additions & 4 deletions src/axl/axl_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
/* unless otherwise indicated all global variables defined in this file must
* only be accessed by the main thread */

/*
* A list of pointers to kvtrees, indexed by AXL ID.
*/
extern kvtree** axl_kvtrees;
struct axl_transfer_array {
/* Array for all the AXL_Create'd kvtree pointers. It's indexed by the AXL id.
*
* Note: We only expand this array, we never shrink it. This is fine since
* the user is only going to be calling AXL_Create() a handful of times. It
* also simplifies the code if we never shrink it, and the extra memory usage
* is negligible, if any at all. */
kvtree** axl_kvtrees;
unsigned int axl_kvtrees_count;
};

extern struct axl_transfer_array* axl_xfer_list;

/* current debug level for AXL library,
* set in AXL_Init and AXL_Config used in axl_dbg.
Expand Down Expand Up @@ -63,6 +71,7 @@ extern int axl_rank;

/* attaches function name, file name, and line number to error messages
* https://gcc.gnu.org/onlinedocs/cpp/Variadic-Macros.html */
#define AXL_ABORT(exitcode, format, ...) axl_abort(exitcode, format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__)
#define AXL_ERR(format, ...) axl_err(format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__)
#define AXL_DBG(level, format, ...) axl_dbg(level, format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__)

Expand Down
57 changes: 3 additions & 54 deletions src/axl/axl_mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ int AXL_Finalize_comm (
return rc;
}

#include <stdio.h>

int AXL_Create_comm (
axl_xfer_t type, /**< [IN] - AXL transfer type (AXL_XFER_SYNC, AXL_XFER_PTHREAD, etc) */
const char* name,
const char* name,
const char* file,
MPI_Comm comm) /**< [IN] - communicator used for coordination and flow control */
{
Expand Down Expand Up @@ -144,59 +146,6 @@ int AXL_Dispatch_comm (
int id, /**< [IN] - transfer hander ID returned from AXL_Create */
MPI_Comm comm) /**< [IN] - communicator used for coordination and flow control */
{
#if 0
/* lookup transfer info for the given id */
kvtree* file_list = NULL;
axl_xfer_t xtype = AXL_XFER_NULL;
axl_xfer_state_t xstate = AXL_XFER_STATE_NULL;
if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) {
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
}

/* check that handle is in correct state to dispatch */
if (xstate != AXL_XFER_STATE_CREATED) {
AXL_ERR("Invalid state to dispatch UID %d", id);
return AXL_FAILURE;
}
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_DISPATCHED);
#endif

#if 0
/* create destination directories for each file */
if (axl_make_directories) {
/* count number of files we have */
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);
kvtree* files_hash = kvtree_get(file_list, AXL_KEY_FILES);
int num_files = kvtree_size(files_hash);

/* allocate pointer for each one */
const char** files = (const char**) AXL_MALLOC(num_files * sizeof(char*));

/* set pointer to each file */
int i;
char* dest;
kvtree_elem* elem;
while ((elem = axl_get_next_path(id, elem, NULL, &dest))) {
files[i] = dest;
i++;
}

/* create directories */
axl_create_dirs(num_files, files, comm);

/* free list of files */
axl_free2(&files);
}

/* TODO: this is hacky */
/* delegate remaining work to regular dispatch,
* but disable mkdir since we already did that */
int make_dir = axl_make_directories;
axl_make_directories = 0;
int rc = AXL_Dispatch(id);
axl_make_directories = make_dir;
#endif
/* delegate remaining work to regular dispatch */
int rc = AXL_Dispatch(id);

Expand Down
Loading