Skip to content

Commit

Permalink
Fetch the object after requesting reconstruction during ray.get (ray-…
Browse files Browse the repository at this point in the history
…project#301)

* Fetch the object after requesting reconstruction during ray.get

* revert

* Fix documentation and memory leak

* Fix hanging reconstruction bug

* Fix for python3
  • Loading branch information
stephanie-wang authored and robertnishihara committed Feb 21, 2017
1 parent 2220a33 commit 334aed9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 27 deletions.
4 changes: 4 additions & 0 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def get_object(self, object_ids):
object_ids (List[object_id.ObjectID]): A list of the object IDs whose
values should be retrieved.
"""
# Do an initial fetch for remote objects.
self.plasma_client.fetch([object_id.id() for object_id in object_ids])

# Get the objects. We initially try to get the objects immediately.
Expand All @@ -477,6 +478,9 @@ def get_object(self, object_ids):
while len(unready_ids) > 0:
for unready_id in unready_ids:
self.photon_client.reconstruct_object(unready_id)
# Do another fetch for objects that aren't available locally yet, in case
# they were evicted since the last fetch.
self.plasma_client.fetch(list(unready_ids.keys()))
results = numbuf.retrieve_list(list(unready_ids.keys()),
self.plasma_client.conn,
GET_TIMEOUT_MILLISECONDS)
Expand Down
12 changes: 5 additions & 7 deletions src/common/redis_module/ray_redis_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,11 @@ int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx,

/**
* Request notifications about the presence of some object IDs. This command
* takes a list of object IDs. There will be an immediate reply acknowledging
* the call and containing a list of all the object IDs that are already
* present in the object table along with vectors of the plasma managers that
* contain each object. For each object ID that is not already present in the
* object table, there will be a separate subsequent reply that returns the list
* of manager vectors conaining the object ID, and this will be called as soon
* as the object is added to the object table.
* takes a list of object IDs. For each object ID, the reply will be the list
* of plasma managers that contain the object. If the list of plasma managers
* is currently nonempty, then the reply will happen immediately. Else, the
* reply will come later, on the first invocation of `RAY.OBJECT_TABLE_ADD`
* following this call.
*
* This is called from a client with the command:
*
Expand Down
42 changes: 23 additions & 19 deletions src/plasma/plasma_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,27 @@ void process_transfer_request(event_loop *loop,
const char *addr,
int port,
client_connection *conn) {
client_connection *manager_conn =
get_manager_connection(conn->manager_state, addr, port);

/* If there is already a request in the transfer queue with the same object
* ID, do not add the transfer request. */
plasma_request_buffer *pending;
LL_FOREACH(manager_conn->transfer_queue, pending) {
if (object_ids_equal(pending->object_id, obj_id) &&
(pending->type == MessageType_PlasmaDataReply)) {
return;
}
}

/* If we already have a connection to this manager and its inactive,
* (re)register it with the event loop again. */
if (manager_conn->transfer_queue == NULL) {
event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
send_queued_request, manager_conn);
}

/* Allocate and append the request to the transfer queue. */
uint8_t *data;
int64_t data_size;
uint8_t *metadata;
Expand Down Expand Up @@ -761,23 +782,6 @@ void process_transfer_request(event_loop *loop,
buf->data_size = obj_buffer.data_size;
buf->metadata_size = obj_buffer.metadata_size;

client_connection *manager_conn =
get_manager_connection(conn->manager_state, addr, port);

if (manager_conn->transfer_queue == NULL) {
/* If we already have a connection to this manager and its inactive,
* (re)register it with the event loop again. */
event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
send_queued_request, manager_conn);
}
/* Add this transfer request to this connection's transfer queue if there
* isn't already a request with the same object ID. */
plasma_request_buffer *pending;
LL_FOREACH(manager_conn->transfer_queue, pending) {
if (object_ids_equal(pending->object_id, buf->object_id)) {
return;
}
}
LL_APPEND(manager_conn->transfer_queue, buf);
}

Expand Down Expand Up @@ -1253,7 +1257,7 @@ void process_delete_object_notification(plasma_manager_state *state,
retry_info retry = {
.num_retries = NUM_RETRIES,
.timeout = MANAGER_TIMEOUT,
.fail_callback = NULL,
.fail_callback = fatal_table_callback,
};
object_table_remove(state->db, obj_id, NULL, &retry, NULL, NULL);
}
Expand All @@ -1280,7 +1284,7 @@ void process_add_object_notification(plasma_manager_state *state,
retry_info retry = {
.num_retries = NUM_RETRIES,
.timeout = MANAGER_TIMEOUT,
.fail_callback = NULL,
.fail_callback = fatal_table_callback,
};
object_table_add(state->db, obj_id,
object_info.data_size + object_info.metadata_size,
Expand Down
2 changes: 1 addition & 1 deletion test/stress_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def setUp(self):
ray.worker._init(address_info=address_info, start_ray_local=True,
num_workers=self.num_local_schedulers,
num_local_schedulers=self.num_local_schedulers,
num_cpus=100)
num_cpus=[1] * self.num_local_schedulers)

def tearDown(self):
self.assertTrue(ray.services.all_processes_alive())
Expand Down

0 comments on commit 334aed9

Please sign in to comment.