Skip to content

Commit

Permalink
Keep objects in cache between tasks (ray-project#29)
Browse files Browse the repository at this point in the history
* fix caching behavior

* fixes
  • Loading branch information
pcmoritz authored and robertnishihara committed Nov 7, 2016
1 parent efe8a29 commit 1147c4d
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 40 deletions.
3 changes: 3 additions & 0 deletions src/common/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <sys/stat.h>
#include <fcntl.h>

/* This is used to define the array of object IDs. */
const UT_icd object_id_icd = {sizeof(object_id), NULL, NULL, NULL};

const unique_id NIL_ID = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255}};

Expand Down
4 changes: 4 additions & 0 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <inttypes.h>
#include <execinfo.h>

#include "utarray.h"

#ifndef RAY_COMMON_DEBUG
#define LOG_DEBUG(M, ...)
#else
Expand Down Expand Up @@ -51,6 +53,8 @@ CHECK(COND)

typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id;

extern const UT_icd object_id_icd;

extern const unique_id NIL_ID;

/* Generate a globally unique ID. */
Expand Down
108 changes: 108 additions & 0 deletions src/common/thirdparty/utringbuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright (c) 2008-2016, Troy D. Hanson http://troydhanson.github.com/uthash/
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

/* a ring-buffer implementation using macros
*/
#ifndef UTRINGBUFFER_H
#define UTRINGBUFFER_H

#define UTRINGBUFFER_VERSION 2.0.1

#include <stdlib.h>
#include <string.h>
#include "utarray.h" // for "UT_icd"

typedef struct {
unsigned i; /* index of next available slot; wraps at n */
unsigned n; /* capacity */
unsigned char f; /* full */
UT_icd icd; /* initializer, copy and destructor functions */
char *d; /* n slots of size icd->sz */
} UT_ringbuffer;

#define utringbuffer_init(a, _n, _icd) do { \
memset(a, 0, sizeof(UT_ringbuffer)); \
(a)->icd = *(_icd); \
(a)->n = (_n); \
if ((a)->n) { (a)->d = (char*)malloc((a)->n * (_icd)->sz); } \
} while(0)

#define utringbuffer_clear(a) do { \
if ((a)->icd.dtor) { \
if ((a)->f) { \
unsigned _ut_i; \
for (_ut_i = 0; _ut_i < (a)->n; ++_ut_i) { \
(a)->icd.dtor(utringbuffer_eltptr(a, _ut_i)); \
} \
} else { \
unsigned _ut_i; \
for (_ut_i = 0; _ut_i < (a)->i; ++_ut_i) { \
(a)->icd.dtor(utringbuffer_eltptr(a, _ut_i)); \
} \
} \
} \
(a)->i = 0; \
(a)->f = 0; \
} while(0)

#define utringbuffer_done(a) do { \
utringbuffer_clear(a); \
free((a)->d); (a)->d = NULL; \
(a)->n = 0; \
} while(0)

#define utringbuffer_new(a,n,_icd) do { \
a = (UT_ringbuffer*)malloc(sizeof(UT_ringbuffer)); \
utringbuffer_init(a, n, _icd); \
} while(0)

#define utringbuffer_free(a) do { \
utringbuffer_done(a); \
free(a); \
} while(0)

#define utringbuffer_push_back(a,p) do { \
if ((a)->icd.dtor && (a)->f) { (a)->icd.dtor(_utringbuffer_internalptr(a,(a)->i)); } \
if ((a)->icd.copy) { (a)->icd.copy( _utringbuffer_internalptr(a,(a)->i), p); } \
else { memcpy(_utringbuffer_internalptr(a,(a)->i), p, (a)->icd.sz); }; \
if (++(a)->i == (a)->n) { (a)->i = 0; (a)->f = 1; } \
} while(0)

#define utringbuffer_len(a) ((a)->f ? (a)->n : (a)->i)
#define utringbuffer_empty(a) ((a)->i == 0 && !(a)->f)
#define utringbuffer_full(a) ((a)->f != 0)

#define _utringbuffer_real_idx(a,j) ((a)->f ? ((j) + (a)->i) % (a)->n : (j))
#define _utringbuffer_internalptr(a,j) ((void*)((a)->d + ((a)->icd.sz * (j))))
#define utringbuffer_eltptr(a,j) ((0 <= (j) && (j) < utringbuffer_len(a)) ? _utringbuffer_internalptr(a,_utringbuffer_real_idx(a,j)) : NULL)

#define _utringbuffer_fake_idx(a,j) ((a)->f ? ((j) + (a)->n - (a)->i) % (a)->n : (j))
#define _utringbuffer_internalidx(a,e) (((char*)(e) >= (a)->d) ? (((char*)(e) - (a)->d)/(a)->icd.sz) : -1)
#define utringbuffer_eltidx(a,e) _utringbuffer_fake_idx(a, _utringbuffer_internalidx(a,e))

#define utringbuffer_front(a) utringbuffer_eltptr(a,0)
#define utringbuffer_next(a,e) ((e)==NULL ? utringbuffer_front(a) : utringbuffer_eltptr(a, utringbuffer_eltidx(a,e)+1))
#define utringbuffer_prev(a,e) ((e)==NULL ? utringbuffer_back(a) : utringbuffer_eltptr(a, utringbuffer_eltidx(a,e)-1))
#define utringbuffer_back(a) (utringbuffer_empty(a) ? NULL : utringbuffer_eltptr(a, utringbuffer_len(a) - 1))

#endif /* UTRINGBUFFER_H */
5 changes: 3 additions & 2 deletions src/photon/photon_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ local_scheduler_state *init_local_scheduler(event_loop *loop,
state->loop = loop;
/* Connect to Plasma. This method will retry if Plasma hasn't started yet.
* Pass in a NULL manager address and port. */
state->plasma_conn = plasma_connect(plasma_socket_name, NULL);
state->plasma_conn =
plasma_connect(plasma_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY);
/* Subscribe to notifications about sealed objects. */
int plasma_fd = plasma_subscribe(state->plasma_conn);
/* Add the callback that processes the notification to the event loop. */
Expand All @@ -79,7 +80,7 @@ local_scheduler_state *init_local_scheduler(event_loop *loop,

void free_local_scheduler(local_scheduler_state *s) {
db_disconnect(s->scheduler_info->db);
free(s->plasma_conn);
plasma_disconnect(s->plasma_conn);
worker_index *current_worker_index, *temp_worker_index;
HASH_ITER(hh, s->worker_index, current_worker_index, temp_worker_index) {
HASH_DEL(s->worker_index, current_worker_index);
Expand Down
2 changes: 1 addition & 1 deletion src/plasma/example.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ int main(int argc, char *argv[]) {
while ((c = getopt(argc, argv, "s:cfg")) != -1) {
switch (c) {
case 's':
conn = plasma_connect(optarg, NULL);
conn = plasma_connect(optarg, NULL, PLASMA_DEFAULT_RELEASE_DELAY);
break;
case 'c':
assert(conn != NULL);
Expand Down
6 changes: 3 additions & 3 deletions src/plasma/lib/python/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class PlasmaClient(object):
strings.
"""

def __init__(self, store_socket_name, manager_socket_name=None):
def __init__(self, store_socket_name, manager_socket_name=None, release_delay=64):
"""Initialize the PlasmaClient.
Args:
Expand Down Expand Up @@ -103,10 +103,10 @@ def __init__(self, store_socket_name, manager_socket_name=None):

if manager_socket_name is not None:
self.has_manager_conn = True
self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, manager_socket_name))
self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, manager_socket_name, release_delay))
else:
self.has_manager_conn = False
self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, None))
self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(store_socket_name, None, release_delay))

def shutdown(self):
"""Shutdown the client so that it does not send messages.
Expand Down
46 changes: 44 additions & 2 deletions src/plasma/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "plasma_client.h"
#include "fling.h"
#include "uthash.h"
#include "utringbuffer.h"

/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50
Expand Down Expand Up @@ -53,6 +54,14 @@ typedef struct {
UT_hash_handle hh;
} object_in_use_entry;

/** Configuration options for the plasma client. */
typedef struct {
/** Number of release calls we wait until the object is actually released.
* This allows us to avoid invalidating the cpu cache on workers if objects
* are reused accross tasks. */
int release_delay;
} plasma_client_config;

/** Information about a connection between a Plasma Client and Plasma Store.
* This is used to avoid mapping the same files into memory multiple times. */
struct plasma_connection {
Expand All @@ -67,6 +76,13 @@ struct plasma_connection {
/** A hash table of the object IDs that are currently being used by this
* client. */
object_in_use_entry *objects_in_use;
/** Object IDs of the last few release calls. This is used to delay
* releasing objects to see if they can be reused by subsequent tasks so we
* do not unneccessarily invalidate cpu caches. TODO(pcm): replace this with
* a proper lru cache of size sizeof(L3 cache). */
UT_ringbuffer *release_history;
/** Configuration options for the plasma client. */
plasma_client_config config;
};

int plasma_request_size(int num_object_ids) {
Expand Down Expand Up @@ -224,7 +240,7 @@ void plasma_get(plasma_connection *conn,
increment_object_count(conn, object_id, object->handle.store_fd);
}

void plasma_release(plasma_connection *conn, object_id object_id) {
void plasma_perform_release(plasma_connection *conn, object_id object_id) {
/* Decrement the count of the number of instances of this object that are
* being used by this client. The corresponding increment should have happened
* in plasma_get. */
Expand Down Expand Up @@ -260,6 +276,24 @@ void plasma_release(plasma_connection *conn, object_id object_id) {
}
}

void plasma_release(plasma_connection *conn, object_id obj_id) {
/* If no ringbuffer is used, don't delay the release. */
if (conn->config.release_delay == 0) {
plasma_perform_release(conn, obj_id);
} else if (!utringbuffer_full(conn->release_history)) {
/* Delay the release by storing new releases into a ringbuffer and only
* popping them off and actually releasing if the buffer is full. This is
* so consecutive tasks don't release and map again objects and invalidate
* the cpu cache this way. */
utringbuffer_push_back(conn->release_history, &obj_id);
} else {
object_id object_id_to_release =
*(object_id *) utringbuffer_front(conn->release_history);
utringbuffer_push_back(conn->release_history, &obj_id);
plasma_perform_release(conn, object_id_to_release);
}
}

/* This method is used to query whether the plasma store contains an object. */
void plasma_contains(plasma_connection *conn,
object_id object_id,
Expand Down Expand Up @@ -342,7 +376,8 @@ int socket_connect_retry(const char *socket_name,
}

plasma_connection *plasma_connect(const char *store_socket_name,
const char *manager_socket_name) {
const char *manager_socket_name,
int release_delay) {
/* Initialize the store connection struct */
plasma_connection *result = malloc(sizeof(plasma_connection));
result->store_conn = socket_connect_retry(
Expand All @@ -355,6 +390,8 @@ plasma_connection *plasma_connect(const char *store_socket_name,
}
result->mmap_table = NULL;
result->objects_in_use = NULL;
result->config.release_delay = release_delay;
utringbuffer_new(result->release_history, release_delay, &object_id_icd);
return result;
}

Expand All @@ -363,6 +400,11 @@ void plasma_disconnect(plasma_connection *conn) {
if (conn->manager_conn >= 0) {
close(conn->manager_conn);
}
object_id *id = NULL;
while ((id = (object_id *) utringbuffer_next(conn->release_history, id))) {
plasma_perform_release(conn, *id);
}
utringbuffer_free(conn->release_history);
free(conn);
}

Expand Down
5 changes: 4 additions & 1 deletion src/plasma/plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include "plasma.h"

#define PLASMA_DEFAULT_RELEASE_DELAY 64

typedef struct plasma_connection plasma_connection;

/**
Expand Down Expand Up @@ -60,7 +62,8 @@ int socket_connect_retry(const char *socket_name,
* @return The object containing the connection state.
*/
plasma_connection *plasma_connect(const char *store_socket_name,
const char *manager_socket_name);
const char *manager_socket_name,
int release_delay);

/**
* Disconnect from the local plasma instance, including the local store and
Expand Down
5 changes: 3 additions & 2 deletions src/plasma/plasma_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name,
int db_port) {
plasma_manager_state *state = malloc(sizeof(plasma_manager_state));
state->loop = event_loop_create();
state->plasma_conn = plasma_connect(store_socket_name, NULL);
state->plasma_conn =
plasma_connect(store_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY);
state->manager_connections = NULL;
state->fetch_connections = NULL;
if (db_addr) {
Expand Down Expand Up @@ -298,7 +299,7 @@ void destroy_plasma_manager_state(plasma_manager_state *state) {
}
}

free(state->plasma_conn);
plasma_disconnect(state->plasma_conn);
event_loop_destroy(state->loop);
free(state);
}
Expand Down
6 changes: 1 addition & 5 deletions src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ struct client {
* object_table_entry type. */
UT_icd client_icd = {sizeof(client *), NULL, NULL, NULL};

/* This is used to define the array of object IDs used to define the
* notification_queue type. */
UT_icd object_table_entry_icd = {sizeof(object_id), NULL, NULL, NULL};

typedef struct {
/** Client file descriptor. This is used as a key for the hash table. */
int subscriber_fd;
Expand Down Expand Up @@ -420,7 +416,7 @@ void subscribe_to_updates(client *client_context, int conn) {
notification_queue *queue =
(notification_queue *) malloc(sizeof(notification_queue));
queue->subscriber_fd = fd;
utarray_new(queue->object_ids, &object_table_entry_icd);
utarray_new(queue->object_ids, &object_id_icd);
HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);
}

Expand Down
9 changes: 5 additions & 4 deletions src/plasma/test/manager_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,17 @@ plasma_mock *init_plasma_mock(plasma_mock *remote_mock) {
mock->write_conn =
get_manager_connection(remote_mock->state, manager_addr, mock->port);
wait_for_pollin(mock->manager_remote_fd);
mock->read_conn = new_client_connection(mock->loop, mock->manager_remote_fd,
mock->state, 0);
mock->read_conn =
new_client_connection(mock->loop, mock->manager_remote_fd, mock->state,
PLASMA_DEFAULT_RELEASE_DELAY);
} else {
mock->write_conn = NULL;
mock->read_conn = NULL;
}
/* Connect a new client to the local plasma manager and mock a request to an
* object. */
mock->plasma_conn = plasma_connect(utstring_body(store_socket_name),
utstring_body(manager_socket_name));
utstring_body(manager_socket_name), 0);
wait_for_pollin(mock->manager_local_fd);
mock->client_conn =
new_client_connection(mock->loop, mock->manager_local_fd, mock->state, 0);
Expand All @@ -126,7 +127,7 @@ void destroy_plasma_mock(plasma_mock *mock) {
}
destroy_plasma_manager_state(mock->state);
free(mock->client_conn);
free(mock->plasma_conn);
plasma_disconnect(mock->plasma_conn);
close(mock->local_store);
close(mock->manager_local_fd);
close(mock->manager_remote_fd);
Expand Down
Loading

0 comments on commit 1147c4d

Please sign in to comment.