Skip to content

Commit

Permalink
Use flatcc for serialization of IPC messages. (ray-project#140)
Browse files Browse the repository at this point in the history
* added Phllipp's updates

* Switch to using flatbuffers for IPC.

* Various changes.

* convert remaining messages and cleanups

* fix

* fix function signatures

* fix valgrind errors

* clang-format

* final commit

* Fix valgrind test.
  • Loading branch information
pcmoritz authored and robertnishihara committed Dec 20, 2016
1 parent 6a73711 commit 0ca0864
Show file tree
Hide file tree
Showing 22 changed files with 2,305 additions and 734 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
/lib/python/plasma
/lib/python/webui

# Files generated by flatcc should be ignored
/src/plasma/format/*_builder.h
/src/plasma/format/*_reader.h
/src/plasma/format/*_verifier.h

# Redis temporary files
*dump.rdb

Expand Down
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ matrix:
before_install:
- sudo apt-get update -qq
- sudo apt-get install -qq valgrind
script:
install:
- ./.travis/install-dependencies.sh

- cd src/common
- make valgrind
- cd ../..
Expand All @@ -43,6 +45,8 @@ matrix:
- make valgrind
- cd ../..

- ./.travis/install-ray.sh
script:
- python src/plasma/test/test.py valgrind
- python src/photon/test/test.py valgrind
- python src/global_scheduler/test/test.py valgrind
Expand Down
16 changes: 16 additions & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
cmake_minimum_required(VERSION 2.8)

project(common)

include(ExternalProject)

set(FLATCC_PREFIX "${CMAKE_CURRENT_LIST_DIR}/build/flatcc-prefix/src/flatcc")

ExternalProject_Add(flatcc
URL "https://github.com/dvidelabs/flatcc/archive/v0.4.0.tar.gz"
INSTALL_COMMAND ""
CMAKE_ARGS "-DCMAKE_C_FLAGS=-fPIC")

set(FLATBUFFERS_INCLUDE_DIR "${FLATCC_PREFIX}/include")
set(FLATBUFFERS_STATIC_LIB "${FLATCC_PREFIX}/lib/libflatcc.a")
set(FLATBUFFERS_COMPILER "${FLATCC_PREFIX}/bin/flatcc")
130 changes: 19 additions & 111 deletions src/common/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,6 @@

#include "common.h"

/**
* Binds to an Internet socket at the given port. Removes any existing file at
* the pathname. Returns a non-blocking file descriptor for the socket, or -1
* if an error occurred.
*
* @note Since the returned file descriptor is non-blocking, it is not
* recommended to use the Linux read and write calls directly, since these
* might read or write a partial message. Instead, use the provided
* write_message and read_message methods.
*
* @param port The port to bind to.
* @param shall_listen Are we also starting to listen on the socket?
* @return A non-blocking file descriptor for the socket, or -1 if an error
* occurs.
*/
int bind_inet_sock(const int port, bool shall_listen) {
struct sockaddr_in name;
int socket_fd = socket(PF_INET, SOCK_STREAM, 0);
Expand Down Expand Up @@ -65,15 +50,6 @@ int bind_inet_sock(const int port, bool shall_listen) {
return socket_fd;
}

/**
* Binds to a Unix domain streaming socket at the given
* pathname. Removes any existing file at the pathname.
*
* @param socket_pathname The pathname for the socket.
* @param shall_listen Are we also starting to listen on the socket?
* @return A blocking file descriptor for the socket, or -1 if an error
* occurs.
*/
int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
struct sockaddr_un socket_address;
int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
Expand Down Expand Up @@ -115,11 +91,6 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
return socket_fd;
}

/**
* Connects to a Unix domain streaming socket at the given
* pathname. Returns a file descriptor for the socket, or -1 if
* an error occurred.
*/
int connect_ipc_sock(const char *socket_pathname) {
struct sockaddr_un socket_address;
int socket_fd;
Expand Down Expand Up @@ -148,10 +119,6 @@ int connect_ipc_sock(const char *socket_pathname) {
return socket_fd;
}

/**
* Accept a new client connection on the given socket
* descriptor. Returns a descriptor for the new socket.
*/
int accept_client(int socket_fd) {
int client_fd = accept(socket_fd, NULL, NULL);
if (client_fd < 0) {
Expand All @@ -161,17 +128,6 @@ int accept_client(int socket_fd) {
return client_fd;
}

/**
* Write a sequence of bytes into a file descriptor. This will block until one
* of the following happens: (1) there is an error (2) end of file, or (3) all
* length bytes have been written.
*
* @param fd The file descriptor to write to. It can be non-blocking.
* @param cursor The cursor pointing to the beginning of the bytes to send.
* @param length The size of the bytes sequence to write.
* @return int Whether there was an error while writing. 0 corresponds to
* success and -1 corresponds to an error (errno will be set).
*/
int write_bytes(int fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
size_t bytesleft = length;
Expand All @@ -197,19 +153,13 @@ int write_bytes(int fd, uint8_t *cursor, size_t length) {
return 0;
}

/**
* Write a sequence of bytes on a file descriptor. The bytes should then be read
* by read_message.
*
* @param fd The file descriptor to write to. It can be non-blocking.
* @param type The type of the message to send.
* @param length The size in bytes of the bytes parameter.
* @param bytes The address of the message to send.
* @return int Whether there was an error while writing. 0 corresponds to
* success and -1 corresponds to an error (errno will be set).
*/
int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) {
int64_t version = RAY_PROTOCOL_VERSION;
int closed;
closed = write_bytes(fd, (uint8_t *) &version, sizeof(version));
if (closed) {
return closed;
}
closed = write_bytes(fd, (uint8_t *) &type, sizeof(type));
if (closed) {
return closed;
Expand All @@ -225,20 +175,6 @@ int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) {
return 0;
}

/**
* Read a sequence of bytes from a file descriptor into a buffer. This will
* block until one of the following happens: (1) there is an error (2) end of
* file, or (3) all length bytes have been written.
*
* @note The buffer pointed to by cursor must already have length number of
* bytes allocated before calling this method.
*
* @param fd The file descriptor to read from. It can be non-blocking.
* @param cursor The cursor pointing to the beginning of the buffer.
* @param length The size of the byte sequence to read.
* @return int Whether there was an error while reading. 0 corresponds to
* success and -1 corresponds to an error (errno will be set).
*/
int read_bytes(int fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
/* Termination condition: EOF or read 'length' bytes total. */
Expand All @@ -263,27 +199,14 @@ int read_bytes(int fd, uint8_t *cursor, size_t length) {
return 0;
}

/**
* Read a sequence of bytes written by write_message from a file descriptor.
* This allocates space for the message.
*
* @note The caller must free the memory.
*
* @param fd The file descriptor to read from. It can be non-blocking.
* @param type The type of the message that is read will be written at this
* address. If there was an error while reading, this will be
* DISCONNECT_CLIENT.
* @param length The size in bytes of the message that is read will be written
* at this address. This size does not include the bytes used to encode
* the type and length. If there was an error while reading, this will
* be 0.
* @param bytes The address at which to write the pointer to the bytes that are
* read and allocated by this function. If there was an error while
* reading, this will be NULL.
* @return Void.
*/
void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {
int closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
int64_t version;
int closed = read_bytes(fd, (uint8_t *) &version, sizeof(version));
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {
goto disconnected;
}
Expand All @@ -307,26 +230,15 @@ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {
return;
}

/**
* Read a sequence of bytes written by write_message from a file descriptor.
* This does not allocate space for the message if the provided buffer is
* large enough and can therefore often avoid allocations.
*
* @note The caller must create and free the buffer.
*
* @param fd The file descriptor to read from. It can be non-blocking.
* @param type The type of the message that is read will be written at this
* address. If there was an error while reading, this will be
* DISCONNECT_CLIENT.
* @param buffer The array the message will be written to. If it is not
* large enough to hold the message, it will be enlarged by read_buffer.
* @return Number of bytes of the message that were read. This size does not
* include the bytes used to encode the type and length. If there was
* an error while reading, this will be 0.
*/
int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) {
int64_t version;
int closed = read_bytes(fd, (uint8_t *) &version, sizeof(version));
if (closed) {
goto disconnected;
}
CHECK(version == RAY_PROTOCOL_VERSION);
int64_t length;
int closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
closed = read_bytes(fd, (uint8_t *) type, sizeof(*type));
if (closed) {
goto disconnected;
}
Expand All @@ -348,15 +260,11 @@ int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) {
return 0;
}

/* Write a null-terminated string to a file descriptor. */
void write_log_message(int fd, char *message) {
/* Account for the \0 at the end of the string. */
write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message);
}

/* Reads a null-terminated string from the file descriptor that has been
* written by write_log_message. Allocates and returns a pointer to the string.
* NOTE: Caller must free the memory! */
char *read_log_message(int fd) {
uint8_t *bytes;
int64_t type;
Expand Down
Loading

0 comments on commit 0ca0864

Please sign in to comment.