Skip to content

Commit

Permalink
Actor checkpointing with object lineage reconstruction (#1004)
Browse files Browse the repository at this point in the history
* Worker reports error in previous task, actor task counter is incremented after task is successful

* Refactor actor task execution

- Return new task counter in GetTaskRequest
- Update worker state for actor tasks inside of the actor method
  executor

* Manually invoked checkpoint method

* Scheduling for actor checkpoint methods

* Fix python bugs in checkpointing

* Return task success from worker to local scheduler instead of actor counter

* Kill local schedulers halfway through actor execution instead of waiting for all tasks to execute once

* Remove redundant actor tasks during dispatch, reconstruct missing dependencies for actor tasks

* Make executor for temporary actor methods

* doc

* Set default argument for whether the previous task was a success

* Refactor actor method call

* Simplify checkpoint task submission

* lint

* fix philipp's comments

* Add missing line

* Make actor reconstruction tests run faster

* Unimportant whitespace.

* Unimportant whitespace.

* Update checkpoint method signature

* Documentation and handle exceptions during checkpoint save/resume

* Rename get_task message field to actor_checkpoint_failed

* Fix bug.

* Remove debugging check, redirect test output
  • Loading branch information
stephanie-wang authored and robertnishihara committed Oct 12, 2017
1 parent b585001 commit 3764f2f
Show file tree
Hide file tree
Showing 14 changed files with 609 additions and 211 deletions.
352 changes: 304 additions & 48 deletions python/ray/actor.py

Large diffs are not rendered by default.

75 changes: 16 additions & 59 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ def __init__(self):
self.fetch_and_register_actor = None
self.make_actor = None
self.actors = {}
self.actor_task_counter = 0
# This field is used to report actor checkpoint failure for the last
# task assigned. Workers are not assigned a task on startup, so we
# initialize to False.
self.actor_checkpoint_failed = False
# TODO(swang): This is a hack to prevent the object store from evicting
# dummy objects. Once we allow object pinning in the store, we may
# remove this variable.
Expand Down Expand Up @@ -691,7 +696,7 @@ def _process_task(self, task):
args = task.arguments()
return_object_ids = task.returns()
if task.actor_id().id() != NIL_ACTOR_ID:
return_object_ids.pop()
dummy_return_id = return_object_ids.pop()
function_name, function_executor = (self.functions
[self.task_driver_id.id()]
[function_id.id()])
Expand All @@ -717,14 +722,10 @@ def _process_task(self, task):
if task.actor_id().id() == NIL_ACTOR_ID:
outputs = function_executor.executor(arguments)
else:
# If this is any actor task other than the first, which has
# no dependencies, the last argument is a dummy argument
# that represents the dependency on the previous actor
# task. Remove this argument for invocation.
if task.actor_counter() > 0:
arguments = arguments[:-1]
outputs = function_executor(
self.actors[task.actor_id().id()], *arguments)
dummy_return_id, task.actor_counter(),
self.actors[task.actor_id().id()],
*arguments)
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
Expand Down Expand Up @@ -764,35 +765,6 @@ def _handle_process_task_failure(self, function_id, return_object_ids,
data={"function_id": function_id.id(),
"function_name": function_name})

def _checkpoint_actor_state(self, actor_counter):
"""Checkpoint the actor state.
This currently saves the checkpoint to Redis, but the checkpoint really
needs to go somewhere else.
Args:
actor_counter: The index of the most recent task that ran on this
actor.
"""
print("Saving actor checkpoint. actor_counter = {}."
.format(actor_counter))
actor_key = b"Actor:" + self.actor_id
checkpoint = self.actors[self.actor_id].__ray_save_checkpoint__()
# Save the checkpoint in Redis. TODO(rkn): Checkpoints should not
# be stored in Redis. Fix this.
self.redis_client.hset(
actor_key,
"checkpoint_{}".format(actor_counter),
checkpoint)
# Remove the previous checkpoints if there is one.
checkpoint_indices = [int(key[len(b"checkpoint_"):])
for key in self.redis_client.hkeys(actor_key)
if key.startswith(b"checkpoint_")]
for index in checkpoint_indices:
if index < actor_counter:
self.redis_client.hdel(actor_key,
"checkpoint_{}".format(index))

def _wait_for_and_process_task(self, task):
"""Wait for a task to be ready and process the task.
Expand Down Expand Up @@ -824,19 +796,6 @@ def _wait_for_and_process_task(self, task):
with log_span("ray:task", contents=contents, worker=self):
self._process_task(task)

# Add the dummy output for actor tasks. TODO(swang): We use a
# numpy array as a hack to pin the object in the object store.
# Once we allow object pinning in the store, we may use `None`.
if task.actor_id().id() != NIL_ACTOR_ID:
dummy_object_id = task.returns().pop()
dummy_object = np.zeros(1)
self.put_object(dummy_object_id, dummy_object)

# Keep the dummy output in scope for the lifetime of the actor,
# to prevent eviction from the object store.
dummy_object = self.get_object([dummy_object_id])
self.actor_pinned_objects.append(dummy_object[0])

# Push all of the log events to the global state store.
flush_log()

Expand All @@ -853,21 +812,19 @@ def _wait_for_and_process_task(self, task):
ray.worker.global_worker.local_scheduler_client.disconnect()
os._exit(0)

# Checkpoint the actor state if it is the right time to do so.
actor_counter = task.actor_counter()
if (self.actor_id != NIL_ACTOR_ID and
self.actor_checkpoint_interval != -1 and
actor_counter % self.actor_checkpoint_interval == 0):
self._checkpoint_actor_state(actor_counter)

def _get_next_task_from_local_scheduler(self):
"""Get the next task from the local scheduler.
Returns:
A task from the local scheduler.
"""
with log_span("ray:get_task", worker=self):
task = self.local_scheduler_client.get_task()
task = self.local_scheduler_client.get_task(
self.actor_checkpoint_failed)
# We assume that the task is not a checkpoint, or that if it is,
# that the task will succeed. The checkpoint task executor is
# responsible for reporting task failure to the local scheduler.
self.actor_checkpoint_failed = False

# Automatically restrict the GPUs available to this task.
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
Expand Down Expand Up @@ -1892,7 +1849,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.class_id = class_id
# Store a list of the dummy outputs produced by actor tasks, to pin the
# dummy outputs in the object store.
worker.actor_pinned_objects = []
worker.actor_pinned_objects = {}

# Initialize the serialization library. This registers some classes, and so
# it must be run before we export all of the cached remote functions.
Expand Down
9 changes: 8 additions & 1 deletion src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,14 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) {
int64_t TaskSpec_actor_counter(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return message->actor_counter();
return std::abs(message->actor_counter());
}

bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
int64_t actor_counter = message->actor_counter();
return actor_counter < 0;
}

UniqueID TaskSpec_driver_id(TaskSpec *spec) {
Expand Down
2 changes: 2 additions & 0 deletions src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ UniqueID TaskSpec_actor_id(TaskSpec *spec);
*/
int64_t TaskSpec_actor_counter(TaskSpec *spec);

bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec);

/**
* Return the driver ID of the task.
*
Expand Down
8 changes: 8 additions & 0 deletions src/local_scheduler/format/local_scheduler.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ enum MessageType:int {
PutObject
}

// This message is sent from a worker to a local scheduler.
table GetTaskRequest {
// Whether the previously assigned task was a checkpoint task that failed.
// If true, then the local scheduler will not update the actor's task
// counter to match the assigned checkpoint index.
actor_checkpoint_failed: bool;
}

// This message is sent from the local scheduler to a worker.
table GetTaskReply {
// A string of bytes representing the task specification.
Expand Down
34 changes: 21 additions & 13 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,9 @@ void assign_task_to_worker(LocalSchedulerState *state,
}
}

void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
void finish_task(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool actor_checkpoint_failed) {
if (worker->task_in_progress != NULL) {
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. */
Expand All @@ -589,8 +591,11 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
}
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
/* Update control state tables. */
Task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
/* Update control state tables. If there was an error while executing a *
* checkpoint task, report the task as lost. Else, the task succeeded. */
int task_state =
actor_checkpoint_failed ? TASK_STATUS_LOST : TASK_STATUS_DONE;
Task_set_state(worker->task_in_progress, task_state);
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
/* The call to task_table_update takes ownership of the
* task_in_progress, so we set the pointer to NULL so it is not used. */
Expand Down Expand Up @@ -734,18 +739,18 @@ void reconstruct_object_lookup_callback(
* object table entry is up-to-date. */
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
/* Look up the task that created the object in the result table. */
if (!never_created && manager_vector.size() == 0) {
/* If the object was created and later evicted, we reconstruct the object
* if and only if there are no other instances of the task running. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_evicted_result_lookup_callback,
(void *) state);
} else if (never_created) {
if (never_created) {
/* If the object has not been created yet, we reconstruct the object if and
* only if the task that created the object failed to complete. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_failed_result_lookup_callback,
(void *) state);
} else if (manager_vector.size() == 0) {
/* If the object was created and later evicted, we reconstruct the object
* if and only if there are no other instances of the task running. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_evicted_result_lookup_callback,
(void *) state);
}
}

Expand Down Expand Up @@ -951,7 +956,7 @@ void process_message(event_loop *loop,
case MessageType_TaskDone: {
} break;
case MessageType_DisconnectClient: {
finish_task(state, worker);
finish_task(state, worker, false);
CHECK(!worker->disconnected);
worker->disconnected = true;
/* If the disconnected worker was not an actor, start a new worker to make
Expand All @@ -977,13 +982,16 @@ void process_message(event_loop *loop,
} break;
case MessageType_GetTask: {
/* If this worker reports a completed task, account for resources. */
finish_task(state, worker);
auto message = flatbuffers::GetRoot<GetTaskRequest>(input);
bool actor_checkpoint_failed = message->actor_checkpoint_failed();
finish_task(state, worker, actor_checkpoint_failed);
/* Let the scheduling algorithm process the fact that there is an available
* worker. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
handle_worker_available(state, state->algorithm_state, worker);
} else {
handle_actor_worker_available(state, state->algorithm_state, worker);
handle_actor_worker_available(state, state->algorithm_state, worker,
actor_checkpoint_failed);
}
} break;
case MessageType_ReconstructObject: {
Expand Down
6 changes: 5 additions & 1 deletion src/local_scheduler/local_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ void assign_task_to_worker(LocalSchedulerState *state,
*
* @param state The local scheduler state.
* @param worker The worker that finished the task.
* @param actor_checkpoint_failed If the last task assigned was a checkpoint
* task that failed.
* @return Void.
*/
void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker);
void finish_task(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool actor_checkpoint_failed);

/**
* This is the callback that is used to process a notification from the Plasma
Expand Down
Loading

0 comments on commit 3764f2f

Please sign in to comment.