Skip to content

Actor checkpointing with object lineage reconstruction #1004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6973c38
Worker reports error in previous task, actor task counter is incremen…
stephanie-wang Sep 21, 2017
c58c437
Refactor actor task execution
stephanie-wang Sep 21, 2017
8b3eb5c
Manually invoked checkpoint method
stephanie-wang Sep 21, 2017
2cfa585
Scheduling for actor checkpoint methods
stephanie-wang Sep 22, 2017
80d3887
Fix python bugs in checkpointing
stephanie-wang Oct 6, 2017
af008d7
Return task success from worker to local scheduler instead of actor c…
stephanie-wang Oct 6, 2017
a5acac4
Kill local schedulers halfway through actor execution instead of wait…
stephanie-wang Oct 6, 2017
f485a8e
Remove redundant actor tasks during dispatch, reconstruct missing dep…
stephanie-wang Oct 6, 2017
d314421
Make executor for temporary actor methods
stephanie-wang Oct 6, 2017
ea61ff5
doc
stephanie-wang Oct 6, 2017
6f6e352
Set default argument for whether the previous task was a success
stephanie-wang Oct 6, 2017
f15e87c
Refactor actor method call
stephanie-wang Oct 6, 2017
cdba049
Simplify checkpoint task submission
stephanie-wang Oct 6, 2017
2123a27
lint
stephanie-wang Oct 6, 2017
71ff282
fix philipp's comments
stephanie-wang Oct 9, 2017
fe13536
Add missing line
stephanie-wang Oct 10, 2017
7ad3c3a
Make actor reconstruction tests run faster
stephanie-wang Oct 10, 2017
d27d046
Unimportant whitespace.
robertnishihara Oct 11, 2017
88db173
Unimportant whitespace.
robertnishihara Oct 12, 2017
4306d87
Update checkpoint method signature
stephanie-wang Oct 11, 2017
b282b45
Documentation and handle exceptions during checkpoint save/resume
stephanie-wang Oct 12, 2017
84d8ab5
Rename get_task message field to actor_checkpoint_failed
stephanie-wang Oct 12, 2017
947d4b4
Fix bug.
robertnishihara Oct 12, 2017
379718f
Remove debugging check, redirect test output
stephanie-wang Oct 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 304 additions & 48 deletions python/ray/actor.py

Large diffs are not rendered by default.

75 changes: 16 additions & 59 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ def __init__(self):
self.fetch_and_register_actor = None
self.make_actor = None
self.actors = {}
self.actor_task_counter = 0
# This field is used to report actor checkpoint failure for the last
# task assigned. Workers are not assigned a task on startup, so we
# initialize to False.
self.actor_checkpoint_failed = False
# TODO(swang): This is a hack to prevent the object store from evicting
# dummy objects. Once we allow object pinning in the store, we may
# remove this variable.
Expand Down Expand Up @@ -691,7 +696,7 @@ def _process_task(self, task):
args = task.arguments()
return_object_ids = task.returns()
if task.actor_id().id() != NIL_ACTOR_ID:
return_object_ids.pop()
dummy_return_id = return_object_ids.pop()
function_name, function_executor = (self.functions
[self.task_driver_id.id()]
[function_id.id()])
Expand All @@ -717,14 +722,10 @@ def _process_task(self, task):
if task.actor_id().id() == NIL_ACTOR_ID:
outputs = function_executor.executor(arguments)
else:
# If this is any actor task other than the first, which has
# no dependencies, the last argument is a dummy argument
# that represents the dependency on the previous actor
# task. Remove this argument for invocation.
if task.actor_counter() > 0:
arguments = arguments[:-1]
outputs = function_executor(
self.actors[task.actor_id().id()], *arguments)
dummy_return_id, task.actor_counter(),
self.actors[task.actor_id().id()],
*arguments)
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
Expand Down Expand Up @@ -764,35 +765,6 @@ def _handle_process_task_failure(self, function_id, return_object_ids,
data={"function_id": function_id.id(),
"function_name": function_name})

def _checkpoint_actor_state(self, actor_counter):
"""Checkpoint the actor state.

This currently saves the checkpoint to Redis, but the checkpoint really
needs to go somewhere else.

Args:
actor_counter: The index of the most recent task that ran on this
actor.
"""
print("Saving actor checkpoint. actor_counter = {}."
.format(actor_counter))
actor_key = b"Actor:" + self.actor_id
checkpoint = self.actors[self.actor_id].__ray_save_checkpoint__()
# Save the checkpoint in Redis. TODO(rkn): Checkpoints should not
# be stored in Redis. Fix this.
self.redis_client.hset(
actor_key,
"checkpoint_{}".format(actor_counter),
checkpoint)
# Remove the previous checkpoints if there is one.
checkpoint_indices = [int(key[len(b"checkpoint_"):])
for key in self.redis_client.hkeys(actor_key)
if key.startswith(b"checkpoint_")]
for index in checkpoint_indices:
if index < actor_counter:
self.redis_client.hdel(actor_key,
"checkpoint_{}".format(index))

def _wait_for_and_process_task(self, task):
"""Wait for a task to be ready and process the task.

Expand Down Expand Up @@ -824,19 +796,6 @@ def _wait_for_and_process_task(self, task):
with log_span("ray:task", contents=contents, worker=self):
self._process_task(task)

# Add the dummy output for actor tasks. TODO(swang): We use a
# numpy array as a hack to pin the object in the object store.
# Once we allow object pinning in the store, we may use `None`.
if task.actor_id().id() != NIL_ACTOR_ID:
dummy_object_id = task.returns().pop()
dummy_object = np.zeros(1)
self.put_object(dummy_object_id, dummy_object)

# Keep the dummy output in scope for the lifetime of the actor,
# to prevent eviction from the object store.
dummy_object = self.get_object([dummy_object_id])
self.actor_pinned_objects.append(dummy_object[0])

# Push all of the log events to the global state store.
flush_log()

Expand All @@ -853,21 +812,19 @@ def _wait_for_and_process_task(self, task):
ray.worker.global_worker.local_scheduler_client.disconnect()
os._exit(0)

# Checkpoint the actor state if it is the right time to do so.
actor_counter = task.actor_counter()
if (self.actor_id != NIL_ACTOR_ID and
self.actor_checkpoint_interval != -1 and
actor_counter % self.actor_checkpoint_interval == 0):
self._checkpoint_actor_state(actor_counter)

def _get_next_task_from_local_scheduler(self):
"""Get the next task from the local scheduler.

Returns:
A task from the local scheduler.
"""
with log_span("ray:get_task", worker=self):
task = self.local_scheduler_client.get_task()
task = self.local_scheduler_client.get_task(
self.actor_checkpoint_failed)
# We assume that the task is not a checkpoint, or that if it is,
# that the task will succeed. The checkpoint task executor is
# responsible for reporting task failure to the local scheduler.
self.actor_checkpoint_failed = False

# Automatically restrict the GPUs available to this task.
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
Expand Down Expand Up @@ -1892,7 +1849,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.class_id = class_id
# Store a list of the dummy outputs produced by actor tasks, to pin the
# dummy outputs in the object store.
worker.actor_pinned_objects = []
worker.actor_pinned_objects = {}

# Initialize the serialization library. This registers some classes, and so
# it must be run before we export all of the cached remote functions.
Expand Down
9 changes: 8 additions & 1 deletion src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,14 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) {
int64_t TaskSpec_actor_counter(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return message->actor_counter();
return std::abs(message->actor_counter());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why std::abs? Is this negative sometimes? And if so, what does that mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set the counter to be negative for checkpoint tasks. It was mostly to avoid adding another field to the task spec. Let me know if you prefer to just add a field.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of negative numbers, i actually think adding a bool to the TaskSpec saying whether it is a checkpoint task or not is cleaner, what do you think?

}

bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
int64_t actor_counter = message->actor_counter();
return actor_counter < 0;
}

UniqueID TaskSpec_driver_id(TaskSpec *spec) {
Expand Down
2 changes: 2 additions & 0 deletions src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ UniqueID TaskSpec_actor_id(TaskSpec *spec);
*/
int64_t TaskSpec_actor_counter(TaskSpec *spec);

bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec);

/**
* Return the driver ID of the task.
*
Expand Down
8 changes: 8 additions & 0 deletions src/local_scheduler/format/local_scheduler.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ enum MessageType:int {
PutObject
}

// This message is sent from a worker to a local scheduler.
table GetTaskRequest {
// Whether the previously assigned task was a checkpoint task that failed.
// If true, then the local scheduler will not update the actor's task
// counter to match the assigned checkpoint index.
actor_checkpoint_failed: bool;
}

// This message is sent from the local scheduler to a worker.
table GetTaskReply {
// A string of bytes representing the task specification.
Expand Down
34 changes: 21 additions & 13 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,9 @@ void assign_task_to_worker(LocalSchedulerState *state,
}
}

void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
void finish_task(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool actor_checkpoint_failed) {
if (worker->task_in_progress != NULL) {
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. */
Expand All @@ -589,8 +591,11 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
}
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
/* Update control state tables. */
Task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
/* Update control state tables. If there was an error while executing a *
* checkpoint task, report the task as lost. Else, the task succeeded. */
int task_state =
actor_checkpoint_failed ? TASK_STATUS_LOST : TASK_STATUS_DONE;
Task_set_state(worker->task_in_progress, task_state);
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
/* The call to task_table_update takes ownership of the
* task_in_progress, so we set the pointer to NULL so it is not used. */
Expand Down Expand Up @@ -734,18 +739,18 @@ void reconstruct_object_lookup_callback(
* object table entry is up-to-date. */
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
/* Look up the task that created the object in the result table. */
if (!never_created && manager_vector.size() == 0) {
/* If the object was created and later evicted, we reconstruct the object
* if and only if there are no other instances of the task running. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_evicted_result_lookup_callback,
(void *) state);
} else if (never_created) {
if (never_created) {
/* If the object has not been created yet, we reconstruct the object if and
* only if the task that created the object failed to complete. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_failed_result_lookup_callback,
(void *) state);
} else if (manager_vector.size() == 0) {
/* If the object was created and later evicted, we reconstruct the object
* if and only if there are no other instances of the task running. */
result_table_lookup(state->db, reconstruct_object_id, NULL,
reconstruct_evicted_result_lookup_callback,
(void *) state);
}
}

Expand Down Expand Up @@ -951,7 +956,7 @@ void process_message(event_loop *loop,
case MessageType_TaskDone: {
} break;
case MessageType_DisconnectClient: {
finish_task(state, worker);
finish_task(state, worker, false);
CHECK(!worker->disconnected);
worker->disconnected = true;
/* If the disconnected worker was not an actor, start a new worker to make
Expand All @@ -977,13 +982,16 @@ void process_message(event_loop *loop,
} break;
case MessageType_GetTask: {
/* If this worker reports a completed task, account for resources. */
finish_task(state, worker);
auto message = flatbuffers::GetRoot<GetTaskRequest>(input);
bool actor_checkpoint_failed = message->actor_checkpoint_failed();
finish_task(state, worker, actor_checkpoint_failed);
/* Let the scheduling algorithm process the fact that there is an available
* worker. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
handle_worker_available(state, state->algorithm_state, worker);
} else {
handle_actor_worker_available(state, state->algorithm_state, worker);
handle_actor_worker_available(state, state->algorithm_state, worker,
actor_checkpoint_failed);
}
} break;
case MessageType_ReconstructObject: {
Expand Down
6 changes: 5 additions & 1 deletion src/local_scheduler/local_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ void assign_task_to_worker(LocalSchedulerState *state,
*
* @param state The local scheduler state.
* @param worker The worker that finished the task.
* @param actor_checkpoint_failed If the last task assigned was a checkpoint
* task that failed.
* @return Void.
*/
void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker);
void finish_task(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool actor_checkpoint_failed);

/**
* This is the callback that is used to process a notification from the Plasma
Expand Down
Loading