Skip to content

Commit

Permalink
[xray] Implement fetch (ray-project#2195)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Jun 10, 2018
1 parent 125fe1c commit 4ec5bea
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 80 deletions.
14 changes: 11 additions & 3 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ def get_object(self, object_ids):
self.plasma_client.fetch(plain_object_ids[i:(
i + ray._config.worker_fetch_request_size())])
else:
print("plasma_client.fetch has not been implemented yet")
self.local_scheduler_client.reconstruct_objects(
object_ids[i:(
i + ray._config.worker_fetch_request_size())], True)

# Get the objects. We initially try to get the objects immediately.
final_results = self.retrieve_and_deserialize(plain_object_ids, 0)
Expand All @@ -466,20 +468,26 @@ def get_object(self, object_ids):
# repeat.
while len(unready_ids) > 0:
for unready_id in unready_ids:
self.local_scheduler_client.reconstruct_object(unready_id)
self.local_scheduler_client.reconstruct_objects(
[ray.ObjectID(unready_id)], False)
# Do another fetch for objects that aren't available locally yet,
# in case they were evicted since the last fetch. We divide the
# fetch into smaller fetches so as to not block the manager for a
# prolonged period of time in a single call.
object_ids_to_fetch = list(
map(plasma.ObjectID, unready_ids.keys()))
ray_object_ids_to_fetch = list(
map(ray.ObjectID, unready_ids.keys()))
for i in range(0, len(object_ids_to_fetch),
ray._config.worker_fetch_request_size()):
if not self.use_raylet:
self.plasma_client.fetch(object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())])
else:
print("plasma_client.fetch has not been implemented yet")
self.local_scheduler_client.reconstruct_objects(
ray_object_ids_to_fetch[i:(
i + ray._config.worker_fetch_request_size())],
True)
results = self.retrieve_and_deserialize(
object_ids_to_fetch,
max([
Expand Down
14 changes: 8 additions & 6 deletions src/local_scheduler/format/local_scheduler.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ enum MessageType:int {
// Tell a worker to execute a task. This is sent from a local scheduler to a
// worker.
ExecuteTask,
// Reconstruct a possibly lost object. This is sent from a worker to a local
// scheduler.
ReconstructObject,
// Reconstruct or fetch possibly lost objects. This is sent from a worker to
// a local scheduler.
ReconstructObjects,
// For a worker that was blocked on some object(s), tell the local scheduler
// that the worker is now unblocked. This is sent from a worker to a local
// scheduler.
Expand Down Expand Up @@ -84,9 +84,11 @@ table RegisterClientRequest {
table DisconnectClient {
}

table ReconstructObject {
// Object ID of the object that needs to be reconstructed.
object_id: string;
table ReconstructObjects {
// List of object IDs of the objects that we want to reconstruct or fetch.
object_ids: [string];
// Do we only want to fetch the objects or also reconstruct them?
fetch_only: bool;
}

table PutObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient__1reconstruct_1object(
// objectId);
UniqueIdFromJByteArray o(env, oid);
auto client = reinterpret_cast<LocalSchedulerConnection *>(c);
local_scheduler_reconstruct_object(client, *o.PID);
local_scheduler_reconstruct_objects(client, {*o.PID});
}

/*
Expand Down
30 changes: 22 additions & 8 deletions src/local_scheduler/lib/python/local_scheduler_extension.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,28 @@ static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) {
}
// clang-format on

static PyObject *PyLocalSchedulerClient_reconstruct_object(PyObject *self,
PyObject *args) {
ObjectID object_id;
if (!PyArg_ParseTuple(args, "O&", PyStringToUniqueID, &object_id)) {
static PyObject *PyLocalSchedulerClient_reconstruct_objects(PyObject *self,
PyObject *args) {
PyObject *py_object_ids;
PyObject *py_fetch_only;
std::vector<ObjectID> object_ids;
if (!PyArg_ParseTuple(args, "OO", &py_object_ids, &py_fetch_only)) {
return NULL;
}
local_scheduler_reconstruct_object(
((PyLocalSchedulerClient *) self)->local_scheduler_connection, object_id);
bool fetch_only = PyObject_IsTrue(py_fetch_only);
Py_ssize_t n = PyList_Size(py_object_ids);
for (int64_t i = 0; i < n; ++i) {
ObjectID object_id;
PyObject *py_object_id = PyList_GetItem(py_object_ids, i);
if (!PyObjectToUniqueID(py_object_id, &object_id)) {
return NULL;
}
object_ids.push_back(object_id);
}
local_scheduler_reconstruct_objects(
reinterpret_cast<PyLocalSchedulerClient *>(self)
->local_scheduler_connection,
object_ids, fetch_only);
Py_RETURN_NONE;
}

Expand Down Expand Up @@ -238,8 +252,8 @@ static PyMethodDef PyLocalSchedulerClient_methods[] = {
"Submit a task to the local scheduler."},
{"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_NOARGS,
"Get a task from the local scheduler."},
{"reconstruct_object",
(PyCFunction) PyLocalSchedulerClient_reconstruct_object, METH_VARARGS,
{"reconstruct_objects",
(PyCFunction) PyLocalSchedulerClient_reconstruct_objects, METH_VARARGS,
"Ask the local scheduler to reconstruct an object."},
{"log_event", (PyCFunction) PyLocalSchedulerClient_log_event, METH_VARARGS,
"Log an event to the event log through the local scheduler."},
Expand Down
12 changes: 7 additions & 5 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1163,10 +1163,10 @@ void process_message(event_loop *loop,
handle_actor_worker_available(state, state->algorithm_state, worker);
}
} break;
case static_cast<int64_t>(MessageType::ReconstructObject): {
auto message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::ReconstructObject>(
input);
case static_cast<int64_t>(MessageType::ReconstructObjects): {
auto message = flatbuffers::GetRoot<
ray::local_scheduler::protocol::ReconstructObjects>(input);
RAY_CHECK(!message->fetch_only());
if (worker->task_in_progress != NULL && !worker->is_blocked) {
/* If the worker was executing a task (i.e. non-driver) and it wasn't
* already blocked on an object that's not locally available, update its
Expand All @@ -1190,7 +1190,9 @@ void process_message(event_loop *loop,
}
print_worker_info("Reconstructing", state->algorithm_state);
}
reconstruct_object(state, from_flatbuf(*message->object_id()));
RAY_CHECK(message->object_ids()->size() == 1);
ObjectID object_id = from_flatbuf(*message->object_ids()->Get(0));
reconstruct_object(state, object_id);
} break;
case static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT): {
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock;
Expand Down
13 changes: 8 additions & 5 deletions src/local_scheduler/local_scheduler_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,17 @@ void local_scheduler_task_done(LocalSchedulerConnection *conn) {
NULL);
}

void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn,
ObjectID object_id) {
void local_scheduler_reconstruct_objects(
LocalSchedulerConnection *conn,
const std::vector<ObjectID> &object_ids,
bool fetch_only) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::local_scheduler::protocol::CreateReconstructObject(
fbb, to_flatbuf(fbb, object_id));
auto object_ids_message = to_flatbuf(fbb, object_ids);
auto message = ray::local_scheduler::protocol::CreateReconstructObjects(
fbb, object_ids_message, fetch_only);
fbb.Finish(message);
write_message(conn->conn,
static_cast<int64_t>(MessageType::ReconstructObject),
static_cast<int64_t>(MessageType::ReconstructObjects),
fbb.GetSize(), fbb.GetBufferPointer());
/* TODO(swang): Propagate the error. */
}
Expand Down
11 changes: 7 additions & 4 deletions src/local_scheduler/local_scheduler_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,17 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
void local_scheduler_task_done(LocalSchedulerConnection *conn);

/**
* Tell the local scheduler to reconstruct an object.
* Tell the local scheduler to reconstruct or fetch objects.
*
* @param conn The connection information.
* @param object_id The ID of the object to reconstruct.
* @param object_ids The IDs of the objects to reconstruct.
* @param fetch_only Only fetch objects, do not reconstruct them.
* @return Void.
*/
void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn,
ObjectID object_id);
void local_scheduler_reconstruct_objects(
LocalSchedulerConnection *conn,
const std::vector<ObjectID> &object_ids,
bool fetch_only = false);

/**
* Send a log message to the local scheduler.
Expand Down
9 changes: 6 additions & 3 deletions src/local_scheduler/test/local_scheduler_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ TEST object_reconstruction_test(void) {

/* Trigger reconstruction, and run the event loop again. */
ObjectID return_id = TaskSpec_return(spec, 0);
local_scheduler_reconstruct_object(worker, return_id);
local_scheduler_reconstruct_objects(worker,
std::vector<ObjectID>({return_id}));
event_loop_add_timer(local_scheduler->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(local_scheduler->loop);
Expand Down Expand Up @@ -369,7 +370,8 @@ TEST object_reconstruction_recursive_test(void) {
}
/* Trigger reconstruction for the last object. */
ObjectID return_id = TaskSpec_return(specs[NUM_TASKS - 1].Spec(), 0);
local_scheduler_reconstruct_object(worker, return_id);
local_scheduler_reconstruct_objects(worker,
std::vector<ObjectID>({return_id}));
/* Run the event loop again. All tasks should be resubmitted. */
event_loop_add_timer(local_scheduler->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
Expand Down Expand Up @@ -437,7 +439,8 @@ TEST object_reconstruction_suppression_test(void) {
0);
/* Trigger a reconstruction. We will check that no tasks get queued as a
* result of this line in the event loop process. */
local_scheduler_reconstruct_object(worker, return_id);
local_scheduler_reconstruct_objects(worker,
std::vector<ObjectID>({return_id}));
/* Clean up. */
free(task_assigned);
LocalSchedulerMock_free(local_scheduler);
Expand Down
14 changes: 8 additions & 6 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ enum MessageType:int {
// Tell a worker to execute a task. This is sent from a local scheduler to a
// worker.
ExecuteTask,
// Reconstruct a possibly lost object. This is sent from a worker to a local
// scheduler.
ReconstructObject,
// Reconstruct or fetch possibly lost objects. This is sent from a worker to
// a local scheduler.
ReconstructObjects,
// For a worker that was blocked on some object(s), tell the local scheduler
// that the worker is now unblocked. This is sent from a worker to a local
// scheduler.
Expand Down Expand Up @@ -118,9 +118,11 @@ table ForwardTaskRequest {
uncommitted_tasks: [Task];
}

table ReconstructObject {
// Object ID of the object that needs to be reconstructed.
object_id: string;
table ReconstructObjects {
// List of object IDs of the objects that we want to reconstruct or fetch.
object_ids: [string];
// Do we only want to fetch the objects or also reconstruct them?
fetch_only: bool;
}

table WaitRequest {
Expand Down
78 changes: 41 additions & 37 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ RAY_CHECK_ENUM(protocol::MessageType::GetTask,
local_scheduler_protocol::MessageType::GetTask);
RAY_CHECK_ENUM(protocol::MessageType::ExecuteTask,
local_scheduler_protocol::MessageType::ExecuteTask);
RAY_CHECK_ENUM(protocol::MessageType::ReconstructObject,
local_scheduler_protocol::MessageType::ReconstructObject);
RAY_CHECK_ENUM(protocol::MessageType::ReconstructObjects,
local_scheduler_protocol::MessageType::ReconstructObjects);
RAY_CHECK_ENUM(protocol::MessageType::NotifyUnblocked,
local_scheduler_protocol::MessageType::NotifyUnblocked);
RAY_CHECK_ENUM(protocol::MessageType::PutObject,
Expand Down Expand Up @@ -391,43 +391,47 @@ void NodeManager::ProcessClientMessage(
// locally, there is no uncommitted lineage.
SubmitTask(task, Lineage());
} break;
case protocol::MessageType::ReconstructObject: {
case protocol::MessageType::ReconstructObjects: {
// TODO(hme): handle multiple object ids.
auto message = flatbuffers::GetRoot<protocol::ReconstructObject>(message_data);
ObjectID object_id = from_flatbuf(*message->object_id());
RAY_LOG(DEBUG) << "reconstructing object " << object_id;
if (!task_dependency_manager_.CheckObjectLocal(object_id)) {
// TODO(swang): Instead of calling Pull on the object directly, record the
// fact that the blocked task is dependent on this object_id in the task
// dependency manager.
RAY_CHECK_OK(object_manager_.Pull(object_id));
}
auto message = flatbuffers::GetRoot<protocol::ReconstructObjects>(message_data);
for (size_t i = 0; i < message->object_ids()->size(); ++i) {
ObjectID object_id = from_flatbuf(*message->object_ids()->Get(i));
RAY_LOG(DEBUG) << "reconstructing object " << object_id;
if (!task_dependency_manager_.CheckObjectLocal(object_id)) {
// TODO(swang): Instead of calling Pull on the object directly, record the
// fact that the blocked task is dependent on this object_id in the task
// dependency manager.
RAY_CHECK_OK(object_manager_.Pull(object_id));
}

// If the blocked client is a worker, and the worker isn't already blocked,
// then release any CPU resources that it acquired for its assigned task
// while it is blocked. The resources will be acquired again once the
// worker is unblocked.
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (worker && !worker->IsBlocked()) {
RAY_CHECK(!worker->GetAssignedTaskId().is_nil());
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
// Release the CPU resources.
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources)));
// Mark the task as blocked.
local_queues_.QueueBlockedTasks(tasks);
worker->MarkBlocked();

// Try to dispatch more tasks since the blocked worker released some
// resources.
DispatchTasks();
if (!message->fetch_only()) {
// If the blocked client is a worker, and the worker isn't already blocked,
// then release any CPU resources that it acquired for its assigned task
// while it is blocked. The resources will be acquired again once the
// worker is unblocked.
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (worker && !worker->IsBlocked()) {
RAY_CHECK(!worker->GetAssignedTaskId().is_nil());
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources =
task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
// Release the CPU resources.
RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()]
.Release(ResourceSet(cpu_resources)));
// Mark the task as blocked.
local_queues_.QueueBlockedTasks(tasks);
worker->MarkBlocked();

// Try to dispatch more tasks since the blocked worker released some
// resources.
DispatchTasks();
}
}
}
} break;
case protocol::MessageType::NotifyUnblocked: {
Expand Down
4 changes: 2 additions & 2 deletions test/stress_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@ def single_dependency(i, arg):
# were evicted and whose originating tasks are still running, this
# for-loop should hang on its first iteration and push an error to the
# driver.
ray.worker.global_worker.local_scheduler_client.reconstruct_object(
args[0].id())
ray.worker.global_worker.local_scheduler_client.reconstruct_objects(
[args[0]], False)

def error_check(errors):
return len(errors) > 1
Expand Down

0 comments on commit 4ec5bea

Please sign in to comment.