Skip to content

Commit

Permalink
Retry connections in photon connect, consolidate code in io.c (ray-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang authored and robertnishihara committed Feb 18, 2017
1 parent 9973a6e commit 67c591c
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 83 deletions.
88 changes: 88 additions & 0 deletions src/common/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@
#include <sys/ioctl.h>
#include <netinet/in.h>
#include <utstring.h>
#include <netdb.h>

#include "common.h"

#ifndef _WIN32
/* This function is actually not declared in standard POSIX, so declare it. */
extern int usleep(useconds_t usec);
#endif

int bind_inet_sock(const int port, bool shall_listen) {
struct sockaddr_in name;
int socket_fd = socket(PF_INET, SOCK_STREAM, 0);
Expand Down Expand Up @@ -91,6 +97,34 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
return socket_fd;
}

int connect_ipc_sock_retry(const char *socket_pathname,
int num_retries,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
}
if (timeout < 0) {
timeout = CONNECT_TIMEOUT_MS;
}

CHECK(socket_pathname);
int fd = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
fd = connect_ipc_sock(socket_pathname);
if (fd >= 0) {
break;
}
/* Sleep for timeout milliseconds. */
usleep(timeout * 1000);
}
/* If we could not connect to the socket, exit. */
if (fd == -1) {
LOG_FATAL("Could not connect to socket %s", socket_pathname);
}
return fd;
}

int connect_ipc_sock(const char *socket_pathname) {
struct sockaddr_un socket_address;
int socket_fd;
Expand Down Expand Up @@ -119,6 +153,60 @@ int connect_ipc_sock(const char *socket_pathname) {
return socket_fd;
}

int connect_inet_sock_retry(const char *ip_addr,
int port,
int num_retries,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = NUM_CONNECT_ATTEMPTS;
}
if (timeout < 0) {
timeout = CONNECT_TIMEOUT_MS;
}

CHECK(ip_addr);
int fd = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
fd = connect_inet_sock(ip_addr, port);
if (fd >= 0) {
break;
}
/* Sleep for timeout milliseconds. */
usleep(timeout * 1000);
}
/* If we could not connect to the socket, exit. */
if (fd == -1) {
LOG_FATAL("Could not connect to address %s:%d", ip_addr, port);
}
return fd;
}

int connect_inet_sock(const char *ip_addr, int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
LOG_ERROR("socket() failed for address %s:%d.", ip_addr, port);
return -1;
}

struct hostent *manager = gethostbyname(ip_addr); /* TODO(pcm): cache this */
if (!manager) {
LOG_ERROR("Failed to get hostname from address %s:%d.", ip_addr, port);
return -1;
}

struct sockaddr_in addr;
addr.sin_family = AF_INET;
memcpy(&addr.sin_addr.s_addr, manager->h_addr_list[0], manager->h_length);
addr.sin_port = htons(port);

if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) != 0) {
LOG_ERROR("Connection to socket failed for address %s:%d.", ip_addr, port);
return -1;
}
return fd;
}

int accept_client(int socket_fd) {
int client_fd = accept(socket_fd, NULL, NULL);
if (client_fd < 0) {
Expand Down
61 changes: 58 additions & 3 deletions src/common/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@

#define RAY_PROTOCOL_VERSION 0x0000000000000000

/* Number of times we try binding to a socket. */
#define NUM_BIND_ATTEMPTS 5
#define BIND_TIMEOUT_MS 100

/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50
#define CONNECT_TIMEOUT_MS 100

enum common_message_type {
/** Disconnect a client. */
DISCONNECT_CLIENT,
Expand Down Expand Up @@ -48,12 +56,59 @@ int bind_inet_sock(const int port, bool shall_listen);
int bind_ipc_sock(const char *socket_pathname, bool shall_listen);

/**
* Connects to a Unix domain streaming socket at the given
* pathname. Returns a file descriptor for the socket, or -1 if
* an error occurred.
* Connect to a Unix domain streaming socket at the given
* pathname.
*
* @param socket_pathname The pathname for the socket.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_ipc_sock(const char *socket_pathname);

/**
* Connect to a Unix domain streaming socket at the given
* pathname, or fail after some number of retries.
*
* @param socket_pathname The pathname for the socket.
* @param num_retries The number of times to retry the connection
* before exiting. If -1 is provided, then this defaults to
* NUM_CONNECT_ATTEMPTS.
* @param timeout The number of milliseconds to wait in between
* retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_ipc_sock_retry(const char *socket_pathname,
int num_retries,
int64_t timeout);

/**
* Connect to an Internet socket at the given address and port.
*
* @param ip_addr The IP address to connect to.
* @param port The port number to connect to.
*
* @param socket_pathname The pathname for the socket.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_inet_sock(const char *ip_addr, int port);

/**
* Connect to an Internet socket at the given address and port, or fail after
* some number of retries.
*
* @param ip_addr The IP address to connect to.
* @param port The port number to connect to.
* @param num_retries The number of times to retry the connection
* before exiting. If -1 is provided, then this defaults to
* NUM_CONNECT_ATTEMPTS.
* @param timeout The number of milliseconds to wait in between
* retries. If -1 is provided, then this defaults to CONNECT_TIMEOUT_MS.
* @return A file descriptor for the socket, or -1 if an error occurred.
*/
int connect_inet_sock_retry(const char *ip_addr,
int port,
int num_retries,
int64_t timeout);

/**
* Accept a new client connection on the given socket
* descriptor. Returns a descriptor for the new socket.
Expand Down
2 changes: 1 addition & 1 deletion src/photon/photon_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

photon_conn *photon_connect(const char *photon_socket, actor_id actor_id) {
photon_conn *result = malloc(sizeof(photon_conn));
result->conn = connect_ipc_sock(photon_socket);
result->conn = connect_ipc_sock_retry(photon_socket, -1, -1);
register_worker_info info;
memset(&info, 0, sizeof(info));
/* Register the process ID with the local scheduler. */
Expand Down
2 changes: 1 addition & 1 deletion src/photon/test/photon_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ photon_mock *init_photon_mock(bool connect_to_redis,
UT_string *plasma_manager_socket_name = bind_ipc_sock_retry(
plasma_manager_socket_name_format, &mock->plasma_manager_fd);
mock->plasma_store_fd =
socket_connect_retry(plasma_store_socket_name, 5, 100);
connect_ipc_sock_retry(plasma_store_socket_name, 5, 100);
UT_string *photon_socket_name =
bind_ipc_sock_retry(photon_socket_name_format, &mock->photon_fd);
CHECK(mock->plasma_store_fd >= 0 && mock->photon_fd >= 0);
Expand Down
77 changes: 2 additions & 75 deletions src/plasma/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@

#define XXH64_DEFAULT_SEED 0

/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50
#define CONNECT_TIMEOUT 100

#ifndef _WIN32
/* This function is actually not declared in standard POSIX, so declare it. */
extern int usleep(useconds_t usec);
#endif

typedef struct {
/** Key that uniquely identifies the memory mapped file. In practice, we
* take the numerical value of the file descriptor in the object store. */
Expand Down Expand Up @@ -564,36 +555,14 @@ int plasma_subscribe(plasma_connection *conn) {
return fd[0];
}

int socket_connect_retry(const char *socket_name,
int num_retries,
int64_t timeout) {
CHECK(socket_name);
int fd = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
fd = connect_ipc_sock(socket_name);
if (fd >= 0) {
break;
}
/* Sleep for timeout milliseconds. */
usleep(timeout * 1000);
}
/* If we could not connect to the socket, exit. */
if (fd == -1) {
LOG_FATAL("could not connect to socket %s", socket_name);
}
return fd;
}

plasma_connection *plasma_connect(const char *store_socket_name,
const char *manager_socket_name,
int release_delay) {
/* Initialize the store connection struct */
plasma_connection *result = malloc(sizeof(plasma_connection));
result->store_conn = socket_connect_retry(
store_socket_name, NUM_CONNECT_ATTEMPTS, CONNECT_TIMEOUT);
result->store_conn = connect_ipc_sock_retry(store_socket_name, -1, -1);
if (manager_socket_name != NULL) {
result->manager_conn = socket_connect_retry(
manager_socket_name, NUM_CONNECT_ATTEMPTS, CONNECT_TIMEOUT);
result->manager_conn = connect_ipc_sock_retry(manager_socket_name, -1, -1);
} else {
result->manager_conn = -1;
}
Expand Down Expand Up @@ -650,48 +619,6 @@ bool plasma_manager_is_connected(plasma_connection *conn) {

#define h_addr h_addr_list[0]

int plasma_manager_try_connect(const char *ip_addr, int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
return -1;
}

struct hostent *manager = gethostbyname(ip_addr); /* TODO(pcm): cache this */
if (!manager) {
return -1;
}

struct sockaddr_in addr;
addr.sin_family = AF_INET;
memcpy(&addr.sin_addr.s_addr, manager->h_addr, manager->h_length);
addr.sin_port = htons(port);

int r = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
return -1;
}
return fd;
}

int plasma_manager_connect(const char *ip_addr, int port) {
/* Try to connect to the Plasma manager. If unsuccessful, retry several times.
*/
int fd = -1;
for (int num_attempts = 0; num_attempts < NUM_CONNECT_ATTEMPTS;
++num_attempts) {
fd = plasma_manager_try_connect(ip_addr, port);
if (fd >= 0) {
break;
}
/* Sleep for 100 milliseconds. */
usleep(100000);
}
if (fd < 0) {
LOG_WARN("Unable to connect to plasma manager at %s:%d", ip_addr, port);
}
return fd;
}

void plasma_transfer(plasma_connection *conn,
const char *address,
int port,
Expand Down
3 changes: 1 addition & 2 deletions src/plasma/plasma_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <poll.h>
#include <assert.h>
#include <netinet/in.h>
#include <netdb.h>

#include "uthash.h"
#include "utlist.h"
Expand Down Expand Up @@ -703,7 +702,7 @@ client_connection *get_manager_connection(plasma_manager_state *state,
utstring_len(ip_addr_port), manager_conn);
if (!manager_conn) {
/* If we don't already have a connection to this manager, start one. */
int fd = plasma_manager_connect(ip_addr, port);
int fd = connect_inet_sock_retry(ip_addr, port, -1, -1);
/* TODO(swang): Handle the case when connection to this manager was
* unsuccessful. */
CHECK(fd >= 0);
Expand Down
2 changes: 1 addition & 1 deletion src/plasma/test/manager_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) {
plasma_mock *mock = malloc(sizeof(plasma_mock));
/* Start listening on all the ports and initiate the local plasma manager. */
mock->port = bind_inet_sock_retry(&mock->manager_remote_fd);
mock->local_store = socket_connect_retry(plasma_store_socket_name, 5, 100);
mock->local_store = connect_ipc_sock_retry(plasma_store_socket_name, 5, 100);
UT_string *manager_socket_name = bind_ipc_sock_retry(
plasma_manager_socket_name_format, &mock->manager_local_fd);

Expand Down

0 comments on commit 67c591c

Please sign in to comment.