diff --git a/python/ray/actor.py b/python/ray/actor.py index ab0fbf5c58e5..eba1dd48cb57 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -7,8 +7,10 @@ import hashlib import inspect import json +import numpy as np import traceback +import pyarrow.plasma as plasma import ray.local_scheduler import ray.signature as signature import ray.worker @@ -40,12 +42,31 @@ def get_actor_method_function_id(attr): return ray.local_scheduler.ObjectID(function_id) -def get_actor_checkpoint(actor_id, worker): +def get_checkpoint_indices(worker, actor_id): + """Get the checkpoint indices associated with a given actor ID. + + Args: + worker: The worker to use to get the checkpoint indices. + actor_id: The actor ID of the actor to get the checkpoint indices for. + + Returns: + The indices of existing checkpoints as a list of integers. + """ + actor_key = b"Actor:" + actor_id + checkpoint_indices = [] + for key in worker.redis_client.hkeys(actor_key): + if key.startswith(b"checkpoint_"): + index = int(key[len(b"checkpoint_"):]) + checkpoint_indices.append(index) + return checkpoint_indices + + +def get_actor_checkpoint(worker, actor_id): """Get the most recent checkpoint associated with a given actor ID. Args: - actor_id: The actor ID of the actor to get the checkpoint for. worker: The worker to use to get the checkpoint. + actor_id: The actor ID of the actor to get the checkpoint for. Returns: If a checkpoint exists, this returns a tuple of the checkpoint index @@ -53,18 +74,103 @@ def get_actor_checkpoint(actor_id, worker): index is the actor counter of the last task that was executed on the actor before the checkpoint was made. """ - # Get all of the keys associated with checkpoints for this actor. - actor_key = b"Actor:" + actor_id - checkpoint_indices = [int(key[len(b"checkpoint_"):]) - for key in worker.redis_client.hkeys(actor_key) - if key.startswith(b"checkpoint_")] + checkpoint_indices = get_checkpoint_indices(worker, actor_id) if len(checkpoint_indices) == 0: return -1, None - most_recent_checkpoint_index = max(checkpoint_indices) - # Get the most recent checkpoint. - checkpoint = worker.redis_client.hget( - actor_key, "checkpoint_{}".format(most_recent_checkpoint_index)) - return most_recent_checkpoint_index, checkpoint + else: + actor_key = b"Actor:" + actor_id + checkpoint_index = max(checkpoint_indices) + checkpoint = worker.redis_client.hget( + actor_key, "checkpoint_{}".format(checkpoint_index)) + return checkpoint_index, checkpoint + + +def put_dummy_object(worker, dummy_object_id): + """Put a dummy actor object into the local object store. + + This registers a dummy object ID in the local store with an empty numpy + array as the value. The resulting object is pinned to the store by storing + it to the worker's state. + + For actors, dummy objects are used to store the stateful dependencies + between consecutive method calls. This function should be called for every + actor method execution that updates the actor's internal state. + + Args: + worker: The worker to use to perform the put. + dummy_object_id: The object ID of the dummy object. + """ + # Add the dummy output for actor tasks. TODO(swang): We use + # a numpy array as a hack to pin the object in the object + # store. Once we allow object pinning in the store, we may + # use `None`. + dummy_object = np.zeros(1) + worker.put_object(dummy_object_id, dummy_object) + # Keep the dummy output in scope for the lifetime of the + # actor, to prevent eviction from the object store. + dummy_object = worker.get_object([dummy_object_id]) + dummy_object = dummy_object[0] + worker.actor_pinned_objects[dummy_object_id] = dummy_object + + +def is_checkpoint_task(task_counter, checkpoint_interval): + if checkpoint_interval <= 0: + return False + return (task_counter % checkpoint_interval == 0) + + +def make_actor_method_executor(worker, method_name, method): + """Make an executor that wraps a user-defined actor method. + + The executor wraps the method to update the worker's internal state. If the + task is a success, the dummy object returned is added to the object store, + to signal that the following task can run, and the worker's task counter is + updated to match the executed task. Else, the executor reports failure to + the local scheduler so that the task counter does not get updated. + + Args: + worker (Worker): The worker that is executing the actor. + method_name (str): The name of the actor method. + method (instancemethod): The actor method to wrap. This should be a + method defined on the actor class and should therefore take an + instance of the actor as the first argument. + + Returns: + A function that executes the given actor method on the worker's stored + instance of the actor. The function also updates the worker's + internal state to record the executed method. + """ + + def actor_method_executor(dummy_return_id, task_counter, actor, + *args): + # An actor task's dependency on the previous task is represented by + # a dummy argument. Remove this argument before invocation. + args = args[:-1] + if method_name == "__ray_checkpoint__": + # Execute the checkpoint task. + actor_checkpoint_failed, error = method(actor, *args) + # If the checkpoint was successfully loaded, put the dummy object + # and update the actor's task counter, so that the task following + # the checkpoint can run. + if not actor_checkpoint_failed: + put_dummy_object(worker, dummy_return_id) + worker.actor_task_counter = task_counter + 1 + # Report to the local scheduler whether this task succeeded in + # loading the checkpoint. + worker.actor_checkpoint_failed = actor_checkpoint_failed + # If there was an exception during the checkpoint method, re-raise + # it after updating the actor's internal state. + if error is not None: + raise error + return None + else: + # Update the worker's internal state before executing the method in + # case the method throws an exception. + put_dummy_object(worker, dummy_return_id) + worker.actor_task_counter = task_counter + 1 + # Execute the actor method. + return method(actor, *args) + return actor_method_executor def fetch_and_register_actor(actor_class_key, worker): @@ -100,8 +206,11 @@ def temporary_actor_method(*xs): "cannot execute this method".format(actor_name)) for actor_method_name in actor_method_names: function_id = get_actor_method_function_id(actor_method_name).id() + temporary_executor = make_actor_method_executor(worker, + actor_method_name, + temporary_actor_method) worker.functions[driver_id][function_id] = (actor_method_name, - temporary_actor_method) + temporary_executor) worker.function_properties[driver_id][function_id] = ( FunctionProperties(num_return_vals=2, num_cpus=1, @@ -112,6 +221,7 @@ def temporary_actor_method(*xs): try: unpickled_class = pickle.loads(pickled_class) + worker.actor_class = unpickled_class except Exception: # If an exception was thrown when the actor was imported, we record the # traceback and notify the scheduler of the failure. @@ -126,11 +236,15 @@ def temporary_actor_method(*xs): # TODO(pcm): Why is the below line necessary? unpickled_class.__module__ = module worker.actors[actor_id_str] = unpickled_class.__new__(unpickled_class) - for (k, v) in inspect.getmembers( + actor_methods = inspect.getmembers( unpickled_class, predicate=(lambda x: (inspect.isfunction(x) or - inspect.ismethod(x)))): - function_id = get_actor_method_function_id(k).id() - worker.functions[driver_id][function_id] = (k, v) + inspect.ismethod(x)))) + for actor_method_name, actor_method in actor_methods: + function_id = get_actor_method_function_id(actor_method_name).id() + executor = make_actor_method_executor(worker, actor_method_name, + actor_method) + worker.functions[driver_id][function_id] = (actor_method_name, + executor) # We do not set worker.function_properties[driver_id][function_id] # because we currently do need the actor worker to submit new tasks # for the actor. @@ -214,6 +328,10 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): + # Add one to the checkpoint interval since we will insert a mock task for + # every checkpoint. + checkpoint_interval += 1 + # Modify the class to have an additional method that will be used for # terminating the worker. class Class(cls): @@ -254,9 +372,101 @@ def __ray_restore_from_checkpoint__(cls, pickled_checkpoint): # TODO(rkn): It's possible that this will cause problems. When # you unpickle the same object twice, the two objects will not # have the same class. - actor_object = pickle.loads(checkpoint) + actor_object = checkpoint return actor_object + def __ray_checkpoint__(self, task_counter, previous_object_id): + """Save or resume a stored checkpoint. + + This task checkpoints the current state of the actor. If the actor + has not yet executed to `task_counter`, then the task instead + attempts to resume from a saved checkpoint that matches + `task_counter`. If the most recently saved checkpoint is earlier + than `task_counter`, the task requests reconstruction of the tasks + that executed since the previous checkpoint and before + `task_counter`. + + Args: + self: An instance of the actor class. + task_counter: The index assigned to this checkpoint method. + previous_object_id: The dummy object returned by the task that + immediately precedes this checkpoint. + + Returns: + A bool representing whether the checkpoint was successfully + loaded (whether the actor can safely execute the next task) + and an Exception instance, if one was thrown. + """ + worker = ray.worker.global_worker + previous_object_id = previous_object_id[0] + plasma_id = plasma.ObjectID(previous_object_id.id()) + + # Initialize the return values. `actor_checkpoint_failed` will be + # set to True if we fail to load the checkpoint. `error` will be + # set to the Exception, if one is thrown. + actor_checkpoint_failed = False + error_to_return = None + + # Save or resume the checkpoint. + if previous_object_id in worker.actor_pinned_objects: + # The preceding task executed on this actor instance. Save the + # checkpoint. + print("Saving actor checkpoint. actor_counter = {}." + .format(task_counter)) + actor_key = b"Actor:" + worker.actor_id + + try: + checkpoint = worker.actors[ + worker.actor_id].__ray_save_checkpoint__() + # Save the checkpoint in Redis. TODO(rkn): Checkpoints + # should not be stored in Redis. Fix this. + worker.redis_client.hset( + actor_key, + "checkpoint_{}".format(task_counter), + checkpoint) + # Remove the previous checkpoints if there is one. + checkpoint_indices = get_checkpoint_indices( + worker, worker.actor_id) + for index in checkpoint_indices: + if index < task_counter: + worker.redis_client.hdel( + actor_key, "checkpoint_{}".format(index)) + # An exception was thrown. Save the error. + except Exception as error: + # Checkpoint saves should not block execution on the actor, + # so we still consider the task successful. + error_to_return = error + else: + # The preceding task has not yet executed on this actor + # instance. Try to resume from the most recent checkpoint. + checkpoint_index, checkpoint = get_actor_checkpoint( + worker, worker.actor_id) + if checkpoint_index == task_counter: + # The checkpoint matches ours. Resume the actor instance. + try: + actor = (worker.actor_class. + __ray_restore_from_checkpoint__(checkpoint)) + worker.actors[worker.actor_id] = actor + # An exception was thrown. Save the error. + except Exception as error: + # We could not resume the checkpoint, so count the task + # as failed. + actor_checkpoint_failed = True + error_to_return = error + else: + # We cannot resume a mismatching checkpoint, so count the + # task as failed. + actor_checkpoint_failed = True + + # Fall back to lineage reconstruction if we were unable to load the + # checkpoint. + if actor_checkpoint_failed: + worker.local_scheduler_client.reconstruct_object( + plasma_id.binary()) + worker.local_scheduler_client.notify_unblocked() + + return actor_checkpoint_failed, error_to_return + Class.__module__ = cls.__module__ Class.__name__ = cls.__name__ @@ -270,10 +480,9 @@ def __ray_restore_from_checkpoint__(cls, pickled_checkpoint): # Create objects to wrap method invocations. This is done so that we can # invoke methods with actor.method.remote() instead of actor.method(). class ActorMethod(object): - def __init__(self, actor, method_name, method_signature): + def __init__(self, actor, method_name): self.actor = actor self.method_name = method_name - self.method_signature = method_signature def __call__(self, *args, **kwargs): raise Exception("Actor methods cannot be called directly. Instead " @@ -282,9 +491,20 @@ def __call__(self, *args, **kwargs): .format(self.method_name, self.method_name)) def remote(self, *args, **kwargs): - return self.actor._actor_method_call(self.method_name, - self.method_signature, *args, - **kwargs) + return self.actor._actor_method_call( + self.method_name, args=args, kwargs=kwargs, + dependency=self.actor._ray_actor_cursor) + + # Checkpoint methods do not take in the state of the previous actor method + # as an explicit data dependency. + class CheckpointMethod(ActorMethod): + def remote(self): + # A checkpoint's arguments are the current task counter and the + # object ID of the preceding task. The latter is an implicit data + # dependency, since the checkpoint method can run at any time. + args = [self.actor._ray_actor_counter, + [self.actor._ray_actor_cursor]] + return self.actor._actor_method_call(self.method_name, args=args) class ActorHandle(object): def __init__(self, *args, **kwargs): @@ -307,10 +527,12 @@ def _manual_init(self, *args, **kwargs): # the current cursor should be added as a dependency, and then # updated to reflect the new invocation. self._ray_actor_cursor = None - self._ray_actor_methods = { - k: v for (k, v) in inspect.getmembers( - Class, predicate=(lambda x: (inspect.isfunction(x) or - inspect.ismethod(x))))} + ray_actor_methods = inspect.getmembers( + Class, predicate=(lambda x: (inspect.isfunction(x) or + inspect.ismethod(x)))) + self._ray_actor_methods = {} + for actor_method_name, actor_method in ray_actor_methods: + self._ray_actor_methods[actor_method_name] = actor_method # Extract the signatures of each of the methods. This will be used # to catch some errors if the methods are called with inappropriate # arguments. @@ -346,18 +568,41 @@ def _manual_init(self, *args, **kwargs): # Call __init__ as a remote function. if "__init__" in self._ray_actor_methods.keys(): - self._actor_method_call( - "__init__", self._ray_method_signatures["__init__"], *args, - **kwargs) + self._actor_method_call("__init__", args=args, kwargs=kwargs) else: print("WARNING: this object has no __init__ method.") - # The function actor_method_call gets called if somebody tries to call - # a method on their local actor stub object. - def _actor_method_call(self, attr, function_signature, *args, - **kwargs): + def _actor_method_call(self, method_name, args=None, kwargs=None, + dependency=None): + """Method execution stub for an actor handle. + + This is the function that executes when + `actor.method_name.remote(*args, **kwargs)` is called. Instead of + executing locally, the method is packaged as a task and scheduled + to the remote actor instance. + + Args: + self: The local actor handle. + method_name: The name of the actor method to execute. + args: A list of arguments for the actor method. + kwargs: A dictionary of keyword arguments for the actor method. + dependency: The object ID that this method is dependent on. + Defaults to None, for no dependencies. Most tasks should + pass in the dummy object returned by the preceding task. + Some tasks, such as checkpoint and terminate methods, have + no dependencies. + + Returns: + object_ids: A list of object IDs returned by the remote actor + method. + """ ray.worker.check_connected() ray.worker.check_main_thread() + function_signature = self._ray_method_signatures[method_name] + if args is None: + args = [] + if kwargs is None: + kwargs = {} args = signature.extend_args(function_signature, args, kwargs) # Execute functions locally if Ray is run in PYTHON_MODE @@ -365,23 +610,33 @@ def _actor_method_call(self, attr, function_signature, *args, if ray.worker.global_worker.mode == ray.PYTHON_MODE: return getattr( ray.worker.global_worker.actors[self._ray_actor_id], - attr)(*copy.deepcopy(args)) + method_name)(*copy.deepcopy(args)) - # Add the current actor cursor, a dummy object returned by the most - # recent method invocation, as a dependency for the next method - # invocation. - if self._ray_actor_cursor is not None: - args.append(self._ray_actor_cursor) + # Add the dummy argument that represents dependency on a preceding + # task. + args.append(dependency) + + actor_counter = self._ray_actor_counter + # Mark checkpoint methods with a negative task counter. + if is_checkpoint_task(actor_counter, checkpoint_interval): + actor_counter = self._ray_actor_counter * -1 - function_id = get_actor_method_function_id(attr) + function_id = get_actor_method_function_id(method_name) object_ids = ray.worker.global_worker.submit_task( function_id, args, actor_id=self._ray_actor_id, - actor_counter=self._ray_actor_counter) + actor_counter=actor_counter) # Update the actor counter and cursor to reflect the most recent # invocation. self._ray_actor_counter += 1 self._ray_actor_cursor = object_ids.pop() + # Submit a checkpoint task if necessary. + if is_checkpoint_task(self._ray_actor_counter, + checkpoint_interval): + self.__ray_checkpoint__.remote() + + # The last object returned is the dummy object that should be + # passed in to the next actor method. Do not return it to the user. if len(object_ids) == 1: return object_ids[0] elif len(object_ids) > 1: @@ -405,8 +660,11 @@ def __getattribute__(self, attr): # ActorMethod has a reference to the ActorHandle and this was # causing cyclic references which were prevent object # deallocation from behaving in a predictable manner. - return ActorMethod(self, attr, - self._ray_method_signatures[attr]) + if attr == "__ray_checkpoint__": + actor_method_cls = CheckpointMethod + else: + actor_method_cls = ActorMethod + return actor_method_cls(self, attr) else: # There is no method with this name, so raise an exception. raise AttributeError("'{}' Actor object has no attribute '{}'" @@ -421,10 +679,8 @@ def __reduce__(self): def __del__(self): """Kill the worker that is running this actor.""" if ray.worker.global_worker.connected: - self._actor_method_call( - "__ray_terminate__", - self._ray_method_signatures["__ray_terminate__"], - self._ray_actor_id.id()) + self._actor_method_call("__ray_terminate__", + args=[self._ray_actor_id.id()]) return ActorHandle diff --git a/python/ray/worker.py b/python/ray/worker.py index ded6fada6f72..d18df9beaf19 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -226,6 +226,11 @@ def __init__(self): self.fetch_and_register_actor = None self.make_actor = None self.actors = {} + self.actor_task_counter = 0 + # This field is used to report actor checkpoint failure for the last + # task assigned. Workers are not assigned a task on startup, so we + # initialize to False. + self.actor_checkpoint_failed = False # TODO(swang): This is a hack to prevent the object store from evicting # dummy objects. Once we allow object pinning in the store, we may # remove this variable. @@ -691,7 +696,7 @@ def _process_task(self, task): args = task.arguments() return_object_ids = task.returns() if task.actor_id().id() != NIL_ACTOR_ID: - return_object_ids.pop() + dummy_return_id = return_object_ids.pop() function_name, function_executor = (self.functions [self.task_driver_id.id()] [function_id.id()]) @@ -717,14 +722,10 @@ def _process_task(self, task): if task.actor_id().id() == NIL_ACTOR_ID: outputs = function_executor.executor(arguments) else: - # If this is any actor task other than the first, which has - # no dependencies, the last argument is a dummy argument - # that represents the dependency on the previous actor - # task. Remove this argument for invocation. - if task.actor_counter() > 0: - arguments = arguments[:-1] outputs = function_executor( - self.actors[task.actor_id().id()], *arguments) + dummy_return_id, task.actor_counter(), + self.actors[task.actor_id().id()], + *arguments) except Exception as e: # Determine whether the exception occured during a task, not an # actor method. @@ -764,35 +765,6 @@ def _handle_process_task_failure(self, function_id, return_object_ids, data={"function_id": function_id.id(), "function_name": function_name}) - def _checkpoint_actor_state(self, actor_counter): - """Checkpoint the actor state. - - This currently saves the checkpoint to Redis, but the checkpoint really - needs to go somewhere else. - - Args: - actor_counter: The index of the most recent task that ran on this - actor. - """ - print("Saving actor checkpoint. actor_counter = {}." - .format(actor_counter)) - actor_key = b"Actor:" + self.actor_id - checkpoint = self.actors[self.actor_id].__ray_save_checkpoint__() - # Save the checkpoint in Redis. TODO(rkn): Checkpoints should not - # be stored in Redis. Fix this. - self.redis_client.hset( - actor_key, - "checkpoint_{}".format(actor_counter), - checkpoint) - # Remove the previous checkpoints if there is one. - checkpoint_indices = [int(key[len(b"checkpoint_"):]) - for key in self.redis_client.hkeys(actor_key) - if key.startswith(b"checkpoint_")] - for index in checkpoint_indices: - if index < actor_counter: - self.redis_client.hdel(actor_key, - "checkpoint_{}".format(index)) - def _wait_for_and_process_task(self, task): """Wait for a task to be ready and process the task. @@ -824,19 +796,6 @@ def _wait_for_and_process_task(self, task): with log_span("ray:task", contents=contents, worker=self): self._process_task(task) - # Add the dummy output for actor tasks. TODO(swang): We use a - # numpy array as a hack to pin the object in the object store. - # Once we allow object pinning in the store, we may use `None`. - if task.actor_id().id() != NIL_ACTOR_ID: - dummy_object_id = task.returns().pop() - dummy_object = np.zeros(1) - self.put_object(dummy_object_id, dummy_object) - - # Keep the dummy output in scope for the lifetime of the actor, - # to prevent eviction from the object store. - dummy_object = self.get_object([dummy_object_id]) - self.actor_pinned_objects.append(dummy_object[0]) - # Push all of the log events to the global state store. flush_log() @@ -853,13 +812,6 @@ def _wait_for_and_process_task(self, task): ray.worker.global_worker.local_scheduler_client.disconnect() os._exit(0) - # Checkpoint the actor state if it is the right time to do so. - actor_counter = task.actor_counter() - if (self.actor_id != NIL_ACTOR_ID and - self.actor_checkpoint_interval != -1 and - actor_counter % self.actor_checkpoint_interval == 0): - self._checkpoint_actor_state(actor_counter) - def _get_next_task_from_local_scheduler(self): """Get the next task from the local scheduler. @@ -867,7 +819,12 @@ def _get_next_task_from_local_scheduler(self): A task from the local scheduler. """ with log_span("ray:get_task", worker=self): - task = self.local_scheduler_client.get_task() + task = self.local_scheduler_client.get_task( + self.actor_checkpoint_failed) + # We assume that the task is not a checkpoint, or that if it is, + # that the task will succeed. The checkpoint task executor is + # responsible for reporting task failure to the local scheduler. + self.actor_checkpoint_failed = False # Automatically restrict the GPUs available to this task. os.environ["CUDA_VISIBLE_DEVICES"] = ",".join( @@ -1892,7 +1849,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.class_id = class_id # Store a list of the dummy outputs produced by actor tasks, to pin the # dummy outputs in the object store. - worker.actor_pinned_objects = [] + worker.actor_pinned_objects = {} # Initialize the serialization library. This registers some classes, and so # it must be run before we export all of the cached remote functions. diff --git a/src/common/task.cc b/src/common/task.cc index f2fd3e1bd05a..c8d8f7d13c39 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -217,7 +217,14 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) { int64_t TaskSpec_actor_counter(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); - return message->actor_counter(); + return std::abs(message->actor_counter()); +} + +bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) { + CHECK(spec); + auto message = flatbuffers::GetRoot(spec); + int64_t actor_counter = message->actor_counter(); + return actor_counter < 0; } UniqueID TaskSpec_driver_id(TaskSpec *spec) { diff --git a/src/common/task.h b/src/common/task.h index 13f4faa6564a..db1b36899b84 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -135,6 +135,8 @@ UniqueID TaskSpec_actor_id(TaskSpec *spec); */ int64_t TaskSpec_actor_counter(TaskSpec *spec); +bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec); + /** * Return the driver ID of the task. * diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index 656235963176..18dd65e5bab2 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -36,6 +36,14 @@ enum MessageType:int { PutObject } +// This message is sent from a worker to a local scheduler. +table GetTaskRequest { + // Whether the previously assigned task was a checkpoint task that failed. + // If true, then the local scheduler will not update the actor's task + // counter to match the assigned checkpoint index. + actor_checkpoint_failed: bool; +} + // This message is sent from the local scheduler to a worker. table GetTaskReply { // A string of bytes representing the task specification. diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index fcb2e677d723..745a670626df 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -568,7 +568,9 @@ void assign_task_to_worker(LocalSchedulerState *state, } } -void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { +void finish_task(LocalSchedulerState *state, + LocalSchedulerClient *worker, + bool actor_checkpoint_failed) { if (worker->task_in_progress != NULL) { TaskSpec *spec = Task_task_spec(worker->task_in_progress); /* Return dynamic resources back for the task in progress. */ @@ -589,8 +591,11 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { } /* If we're connected to Redis, update tables. */ if (state->db != NULL) { - /* Update control state tables. */ - Task_set_state(worker->task_in_progress, TASK_STATUS_DONE); + /* Update control state tables. If there was an error while executing a * + * checkpoint task, report the task as lost. Else, the task succeeded. */ + int task_state = + actor_checkpoint_failed ? TASK_STATUS_LOST : TASK_STATUS_DONE; + Task_set_state(worker->task_in_progress, task_state); task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL); /* The call to task_table_update takes ownership of the * task_in_progress, so we set the pointer to NULL so it is not used. */ @@ -734,18 +739,18 @@ void reconstruct_object_lookup_callback( * object table entry is up-to-date. */ LocalSchedulerState *state = (LocalSchedulerState *) user_context; /* Look up the task that created the object in the result table. */ - if (!never_created && manager_vector.size() == 0) { - /* If the object was created and later evicted, we reconstruct the object - * if and only if there are no other instances of the task running. */ - result_table_lookup(state->db, reconstruct_object_id, NULL, - reconstruct_evicted_result_lookup_callback, - (void *) state); - } else if (never_created) { + if (never_created) { /* If the object has not been created yet, we reconstruct the object if and * only if the task that created the object failed to complete. */ result_table_lookup(state->db, reconstruct_object_id, NULL, reconstruct_failed_result_lookup_callback, (void *) state); + } else if (manager_vector.size() == 0) { + /* If the object was created and later evicted, we reconstruct the object + * if and only if there are no other instances of the task running. */ + result_table_lookup(state->db, reconstruct_object_id, NULL, + reconstruct_evicted_result_lookup_callback, + (void *) state); } } @@ -951,7 +956,7 @@ void process_message(event_loop *loop, case MessageType_TaskDone: { } break; case MessageType_DisconnectClient: { - finish_task(state, worker); + finish_task(state, worker, false); CHECK(!worker->disconnected); worker->disconnected = true; /* If the disconnected worker was not an actor, start a new worker to make @@ -977,13 +982,16 @@ void process_message(event_loop *loop, } break; case MessageType_GetTask: { /* If this worker reports a completed task, account for resources. */ - finish_task(state, worker); + auto message = flatbuffers::GetRoot(input); + bool actor_checkpoint_failed = message->actor_checkpoint_failed(); + finish_task(state, worker, actor_checkpoint_failed); /* Let the scheduling algorithm process the fact that there is an available * worker. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { handle_worker_available(state, state->algorithm_state, worker); } else { - handle_actor_worker_available(state, state->algorithm_state, worker); + handle_actor_worker_available(state, state->algorithm_state, worker, + actor_checkpoint_failed); } } break; case MessageType_ReconstructObject: { diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index 9ed2563b6546..dfddf7bca753 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -56,9 +56,13 @@ void assign_task_to_worker(LocalSchedulerState *state, * * @param state The local scheduler state. * @param worker The worker that finished the task. + * @param actor_checkpoint_failed If the last task assigned was a checkpoint + * task that failed. * @return Void. */ -void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker); +void finish_task(LocalSchedulerState *state, + LocalSchedulerClient *worker, + bool actor_checkpoint_failed); /** * This is the callback that is used to process a notification from the Plasma diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 9f7adac4131b..9cfbc08c5b92 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -53,6 +53,10 @@ typedef struct { * restrict the submission of tasks on actors to the process that created the * actor. */ int64_t task_counter; + /** The index of the task assigned to this actor. Set to -1 if no task is + * currently assigned. If the actor process reports back success for the + * assigned task execution, task_counter should be set to this value. */ + int64_t assigned_task_counter; /** A queue of tasks to be executed on this actor. The tasks will be sorted by * the order of their actor counters. */ std::list *task_queue; @@ -234,6 +238,7 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, LocalSchedulerClient *worker) { LocalActorInfo entry; entry.task_counter = 0; + entry.assigned_task_counter = -1; entry.task_queue = new std::list(); entry.worker = worker; entry.worker_available = false; @@ -309,39 +314,56 @@ bool dispatch_actor_task(LocalSchedulerState *state, /* There should be some queued tasks for this actor. */ CHECK(!entry.task_queue->empty()); - - TaskQueueEntry first_task = entry.task_queue->front(); - int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec); - if (next_task_counter != entry.task_counter) { - /* We cannot execute the next task on this actor without violating the - * in-order execution guarantee for actor tasks. */ - CHECK(next_task_counter > entry.task_counter); - return false; - } /* If the worker is not available, we cannot assign a task to it. */ if (!entry.worker_available) { return false; } + + /* Find the first task that either matches the task counter or that is a + * checkpoint method. Remove any tasks that we have already executed past + * (e.g., by executing a more recent checkpoint method). */ + auto task = entry.task_queue->begin(); + int64_t next_task_counter = TaskSpec_actor_counter(task->spec); + while (next_task_counter != entry.task_counter) { + if (next_task_counter < entry.task_counter) { + /* A task that we have already executed past. Remove it. */ + task = entry.task_queue->erase(task); + /* If there are no more tasks in the queue, wait. */ + if (task == entry.task_queue->end()) { + algorithm_state->actors_with_pending_tasks.erase(actor_id); + return false; + } + /* Move on to the next task. */ + next_task_counter = TaskSpec_actor_counter(task->spec); + } else if (TaskSpec_actor_is_checkpoint_method(task->spec)) { + /* A later task that is a checkpoint method. Checkpoint methods can + * always be executed. */ + break; + } else { + /* A later task that is not a checkpoint. Wait for the preceding tasks to + * execute. */ + return false; + } + } + /* If there are not enough resources available, we cannot assign the task. */ - CHECK(0 == - TaskSpec_get_required_resource(first_task.spec, ResourceIndex_GPU)); + CHECK(0 == TaskSpec_get_required_resource(task->spec, ResourceIndex_GPU)); if (!check_dynamic_resources( - state, - TaskSpec_get_required_resource(first_task.spec, ResourceIndex_CPU), 0, - TaskSpec_get_required_resource(first_task.spec, - ResourceIndex_CustomResource))) { + state, TaskSpec_get_required_resource(task->spec, ResourceIndex_CPU), + 0, TaskSpec_get_required_resource(task->spec, + ResourceIndex_CustomResource))) { return false; } + /* Assign the first task in the task queue to the worker and mark the worker * as unavailable. */ - entry.task_counter += 1; - assign_task_to_worker(state, first_task.spec, first_task.task_spec_size, - entry.worker); + assign_task_to_worker(state, task->spec, task->task_spec_size, entry.worker); + entry.assigned_task_counter = next_task_counter; entry.worker_available = false; /* Free the task queue entry. */ - TaskQueueEntry_free(&first_task); + TaskQueueEntry_free(&(*task)); /* Remove the task from the actor's task queue. */ - entry.task_queue->pop_front(); + entry.task_queue->erase(task); /* If there are no more tasks in the queue, then indicate that the actor has * no tasks. */ @@ -414,12 +436,11 @@ void add_task_to_actor_queue(LocalSchedulerState *state, * guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This * check will fail if the fault-tolerance mechanism resubmits a task on an * actor. */ - bool task_is_redundant = false; if (task_counter < entry.task_counter) { LOG_INFO( "A task that has already been executed has been resubmitted, so we " "are ignoring it. This should only happen during reconstruction."); - task_is_redundant = true; + return; } /* Create a new task queue entry. */ @@ -437,32 +458,51 @@ void add_task_to_actor_queue(LocalSchedulerState *state, if (it != entry.task_queue->end() && task_counter == TaskSpec_actor_counter(it->spec)) { LOG_INFO( - "A task that has already been executed has been resubmitted, so we " - "are ignoring it. This should only happen during reconstruction."); - task_is_redundant = true; + "A task was resubmitted, so we are ignoring it. This should only " + "happen during reconstruction."); + return; } - if (!task_is_redundant) { - entry.task_queue->insert(it, elt); - - /* Update the task table. */ - if (state->db != NULL) { - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_QUEUED, - get_db_client_id(state->db)); - if (from_global_scheduler) { - /* If the task is from the global scheduler, it's already been added to - * the task table, so just update the entry. */ - task_table_update(state->db, task, NULL, NULL, NULL); - } else { - /* Otherwise, this is the first time the task has been seen in the - * system (unless it's a resubmission of a previous task), so add the - * entry. */ - task_table_add_task(state->db, task, NULL, NULL, NULL); - } + /* The task has a counter that has not been executed or submitted before. Add + * it to the actor queue. */ + entry.task_queue->insert(it, elt); + + /* Update the task table. */ + if (state->db != NULL) { + Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_QUEUED, + get_db_client_id(state->db)); + if (from_global_scheduler) { + /* If the task is from the global scheduler, it's already been added to + * the task table, so just update the entry. */ + task_table_update(state->db, task, NULL, NULL, NULL); + } else { + /* Otherwise, this is the first time the task has been seen in the + * system (unless it's a resubmission of a previous task), so add the + * entry. */ + task_table_add_task(state->db, task, NULL, NULL, NULL); } + } - /* Record the fact that this actor has a task waiting to execute. */ - algorithm_state->actors_with_pending_tasks.insert(actor_id); + /* Record the fact that this actor has a task waiting to execute. */ + algorithm_state->actors_with_pending_tasks.insert(actor_id); + + /* Register a missing dependency on the preceding task. TODO(swang): Unify + * with `fetch_missing_dependencies` for non-actor tasks. */ + if (entry.task_counter != task_counter) { + int64_t num_args = TaskSpec_num_args(spec); + /* The last argument represents dependency on a preceding task. If it is by + * reference, then it is an explicit dependency. */ + if (TaskSpec_arg_by_ref(spec, num_args - 1)) { + ObjectID dummy_object_id = TaskSpec_arg_id(spec, num_args - 1); + if (algorithm_state->local_objects.count(dummy_object_id) == 0) { + ObjectEntry entry; + /* TODO(swang): Objects in `remote_objects` will get fetched from + * remote plasma managers. Do not fetch actor dummy objects. Otherwise, + * if the plasma manager associated with the dead local scheduler is + * still alive, reconstruction will never complete. */ + state->algorithm_state->remote_objects[dummy_object_id] = entry; + } + } } } @@ -1202,7 +1242,8 @@ void handle_actor_worker_disconnect(LocalSchedulerState *state, void handle_actor_worker_available(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - LocalSchedulerClient *worker) { + LocalSchedulerClient *worker, + bool actor_checkpoint_failed) { ActorID actor_id = worker->actor_id; CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); /* Get the actor info for this worker. */ @@ -1212,6 +1253,13 @@ void handle_actor_worker_available(LocalSchedulerState *state, CHECK(worker == entry.worker); CHECK(!entry.worker_available); + /* If the assigned task was not a checkpoint task, or if it was but it + * loaded the checkpoint successfully, then we update the actor's counter + * to the assigned counter. */ + if (!actor_checkpoint_failed) { + entry.task_counter = entry.assigned_task_counter + 1; + } + entry.assigned_task_counter = -1; entry.worker_available = true; /* Assign new tasks if possible. */ dispatch_all_tasks(state, algorithm_state); diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index 914260ff7b29..0c0772beabd9 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -178,12 +178,15 @@ void handle_worker_removed(LocalSchedulerState *state, * * @param state The state of the local scheduler. * @param algorithm_state State maintained by the scheduling algorithm. - * @param wi Information about the worker that is available. + * @param worker The worker that is available. + * @param actor_checkpoint_failed If the last task assigned was a checkpoint + * task that failed. * @return Void. */ void handle_actor_worker_available(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - LocalSchedulerClient *worker); + LocalSchedulerClient *worker, + bool actor_checkpoint_failed); /** * Handle the fact that a new worker is available for running an actor. diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 2e10510ebe97..10886e7d2c53 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -95,14 +95,19 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, } TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, - int64_t *task_size) { - write_message(conn->conn, MessageType_GetTask, 0, NULL); + int64_t *task_size, + bool actor_checkpoint_failed) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreateGetTaskRequest(fbb, actor_checkpoint_failed); + fbb.Finish(message); + write_message(conn->conn, MessageType_GetTask, fbb.GetSize(), + fbb.GetBufferPointer()); int64_t type; - int64_t message_size; - uint8_t *message; + int64_t reply_size; + uint8_t *reply; /* Receive a task from the local scheduler. This will block until the local * scheduler gives this client a task. */ - read_message(conn->conn, &type, &message_size, &message); + read_message(conn->conn, &type, &reply_size, &reply); if (type == DISCONNECT_CLIENT) { LOG_WARN("Exiting because local scheduler closed connection."); exit(1); @@ -110,7 +115,7 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, CHECK(type == MessageType_ExecuteTask); /* Parse the flatbuffer object. */ - auto reply_message = flatbuffers::GetRoot(message); + auto reply_message = flatbuffers::GetRoot(reply); /* Set the GPU IDs for this task. We only do this for non-actor tasks because * for actors the GPUs are associated with the actor itself and not with the @@ -127,7 +132,7 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, TaskSpec *data = (TaskSpec *) reply_message->task_spec()->data(); TaskSpec *spec = TaskSpec_copy(data, *task_size); /* Free the original message from the local scheduler. */ - free(message); + free(reply); /* Return the copy of the task spec and pass ownership to the caller. */ return spec; } diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 5ac6f6af3cf7..00ce0812fa4d 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -94,10 +94,14 @@ void local_scheduler_log_event(LocalSchedulerConnection *conn, * @todo When does this actually get freed? * * @param conn The connection information. + * @param task_size A pointer to fill out with the task size. + * @param actor_checkpoint_failed If the last task assigned was a checkpoint + * task that failed. * @return The address of the assigned task. */ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, - int64_t *task_size); + int64_t *task_size, + bool actor_checkpoint_failed); /** * Tell the local scheduler that the client has finished executing a task. diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index 67bb1dc5e17c..3c5b252c49fa 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -59,14 +59,25 @@ static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { } // clang-format off -static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) { +static PyObject *PyLocalSchedulerClient_get_task(PyObject *self, PyObject *args) { + PyObject *py_actor_checkpoint_failed = NULL; + if (!PyArg_ParseTuple(args, "|O", &py_actor_checkpoint_failed)) { + return NULL; + } TaskSpec *task_spec; + int64_t task_size; + /* If no argument for actor_checkpoint_failed was provided, default to false, + * since we assume that there was no previous task. */ + bool actor_checkpoint_failed = false; + if (py_actor_checkpoint_failed != NULL) { + actor_checkpoint_failed = (bool) PyObject_IsTrue(py_actor_checkpoint_failed); + } /* Drop the global interpreter lock while we get a task because * local_scheduler_get_task may block for a long time. */ - int64_t task_size; Py_BEGIN_ALLOW_THREADS task_spec = local_scheduler_get_task( - ((PyLocalSchedulerClient *) self)->local_scheduler_connection, &task_size); + ((PyLocalSchedulerClient *) self)->local_scheduler_connection, + &task_size, actor_checkpoint_failed); Py_END_ALLOW_THREADS return PyTask_make(task_spec, task_size); } @@ -138,7 +149,7 @@ static PyMethodDef PyLocalSchedulerClient_methods[] = { "Notify the local scheduler that this client is exiting gracefully."}, {"submit", (PyCFunction) PyLocalSchedulerClient_submit, METH_VARARGS, "Submit a task to the local scheduler."}, - {"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_NOARGS, + {"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_VARARGS, "Get a task from the local scheduler."}, {"reconstruct_object", (PyCFunction) PyLocalSchedulerClient_reconstruct_object, METH_VARARGS, diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index 7f8c508fc331..0bedbd3e53bc 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -210,12 +210,12 @@ TEST object_reconstruction_test(void) { int64_t task_assigned_size; local_scheduler_submit(worker, spec, task_size); TaskSpec *task_assigned = - local_scheduler_get_task(worker, &task_assigned_size); + local_scheduler_get_task(worker, &task_assigned_size, true); ASSERT_EQ(memcmp(task_assigned, spec, task_size), 0); ASSERT_EQ(task_assigned_size, task_size); int64_t reconstruct_task_size; TaskSpec *reconstruct_task = - local_scheduler_get_task(worker, &reconstruct_task_size); + local_scheduler_get_task(worker, &reconstruct_task_size, true); ASSERT_EQ(memcmp(reconstruct_task, spec, task_size), 0); ASSERT_EQ(reconstruct_task_size, task_size); /* Clean up. */ @@ -315,7 +315,8 @@ TEST object_reconstruction_recursive_test(void) { /* Make sure we receive each task from the initial submission. */ for (int i = 0; i < NUM_TASKS; ++i) { int64_t task_size; - TaskSpec *task_assigned = local_scheduler_get_task(worker, &task_size); + TaskSpec *task_assigned = + local_scheduler_get_task(worker, &task_size, true); ASSERT_EQ(memcmp(task_assigned, specs[i], task_sizes[i]), 0); ASSERT_EQ(task_size, task_sizes[i]); free(task_assigned); @@ -325,7 +326,7 @@ TEST object_reconstruction_recursive_test(void) { for (int i = 0; i < NUM_TASKS; ++i) { int64_t task_assigned_size; TaskSpec *task_assigned = - local_scheduler_get_task(worker, &task_assigned_size); + local_scheduler_get_task(worker, &task_assigned_size, true); bool found = false; for (int j = 0; j < NUM_TASKS; ++j) { if (specs[j] == NULL) { @@ -410,7 +411,7 @@ TEST object_reconstruction_suppression_test(void) { * object_table_add callback completes. */ int64_t task_assigned_size; TaskSpec *task_assigned = - local_scheduler_get_task(worker, &task_assigned_size); + local_scheduler_get_task(worker, &task_assigned_size, true); ASSERT_EQ(memcmp(task_assigned, object_reconstruction_suppression_spec, object_reconstruction_suppression_size), 0); diff --git a/test/actor_test.py b/test/actor_test.py index 4d6003e7e72c..459089773977 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1191,12 +1191,8 @@ def inc(self): # Wait for the last task to finish running. ray.get(ids[-1]) - # Kill the second local scheduler. - process = ray.services.all_processes[ - ray.services.PROCESS_TYPE_LOCAL_SCHEDULER][1] - process.kill() - process.wait() - # Kill the corresponding plasma store to get rid of the cached objects. + # Kill the second plasma store to get rid of the cached objects and + # trigger the corresponding local scheduler to exit. process = ray.services.all_processes[ ray.services.PROCESS_TYPE_PLASMA_STORE][1] process.kill() @@ -1253,14 +1249,10 @@ def inc(self, duration): for _ in range(num_function_calls_at_a_time): result_ids[actor].append( actor.inc.remote(j ** 2 * 0.000001)) - # Kill a local scheduler. Don't kill the first local scheduler - # since that is the one that the driver is connected to. - process = ray.services.all_processes[ - ray.services.PROCESS_TYPE_LOCAL_SCHEDULER][i + 1] - process.kill() - process.wait() - # Kill the corresponding plasma store to get rid of the cached - # objects. + # Kill a plasma store to get rid of the cached objects and trigger + # exit of the corresponding local scheduler. Don't kill the first + # local scheduler since that is the one that the driver is + # connected to. process = ray.services.all_processes[ ray.services.PROCESS_TYPE_PLASMA_STORE][i + 1] process.kill() @@ -1280,19 +1272,21 @@ def inc(self, duration): ray.worker.cleanup() - @unittest.skip("Skipping until checkpointing is integrated with object " - "lineage.") - def testCheckpointing(self): + def setup_test_checkpointing(self, save_exception=False, + resume_exception=False): ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_workers=0, redirect_output=True) @ray.remote(checkpoint_interval=5) class Counter(object): - def __init__(self): + _resume_exception = resume_exception + + def __init__(self, save_exception): self.x = 0 # The number of times that inc has been called. We won't bother # restoring this in the checkpoint self.num_inc_calls = 0 + self.save_exception = save_exception def local_plasma(self): return ray.worker.global_worker.plasma_client.store_socket_name @@ -1310,9 +1304,13 @@ def test_restore(self): return self.y def __ray_save__(self): + if self.save_exception: + raise Exception("Exception raised in checkpoint save") return self.x, -1 def __ray_restore__(self, checkpoint): + if self._resume_exception: + raise Exception("Exception raised in checkpoint resume") self.x, val = checkpoint self.num_inc_calls = 0 # Test that __ray_save__ has been run. @@ -1322,21 +1320,20 @@ def __ray_restore__(self, checkpoint): local_plasma = ray.worker.global_worker.plasma_client.store_socket_name # Create an actor that is not on the local scheduler. - actor = Counter.remote() + actor = Counter.remote(save_exception) while ray.get(actor.local_plasma.remote()) == local_plasma: - actor = Counter.remote() + actor = Counter.remote(save_exception) args = [ray.put(0) for _ in range(100)] ids = [actor.inc.remote(*args[i:]) for i in range(100)] + return actor, ids + + def testCheckpointing(self): + actor, ids = self.setup_test_checkpointing() # Wait for the last task to finish running. ray.get(ids[-1]) - # Kill the second local scheduler. - process = ray.services.all_processes[ - ray.services.PROCESS_TYPE_LOCAL_SCHEDULER][1] - process.kill() - process.wait() # Kill the corresponding plasma store to get rid of the cached objects. process = ray.services.all_processes[ ray.services.PROCESS_TYPE_PLASMA_STORE][1] @@ -1355,6 +1352,93 @@ def __ray_restore__(self, checkpoint): ray.worker.cleanup() + def testLostCheckpoint(self): + actor, ids = self.setup_test_checkpointing() + # Wait for the first fraction of tasks to finish running. + ray.get(ids[len(ids) // 10]) + + actor_key = b"Actor:" + actor._ray_actor_id.id() + for index in ray.actor.get_checkpoint_indices( + ray.worker.global_worker, actor._ray_actor_id.id()): + ray.worker.global_worker.redis_client.hdel( + actor_key, "checkpoint_{}".format(index)) + + # Kill the corresponding plasma store to get rid of the cached objects. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_PLASMA_STORE][1] + process.kill() + process.wait() + + self.assertEqual(ray.get(actor.inc.remote()), 101) + + # Each inc method has been reexecuted once on the new actor. + self.assertEqual(ray.get(actor.get_num_inc_calls.remote()), 101) + # Get all of the results that were previously lost. Because the + # checkpoints were lost, all methods should be reconstructed. + results = ray.get(ids) + self.assertEqual(results, list(range(1, 1 + len(results)))) + + ray.worker.cleanup() + + def testCheckpointException(self): + actor, ids = self.setup_test_checkpointing(save_exception=True) + # Wait for the last task to finish running. + ray.get(ids[-1]) + + # Kill the corresponding plasma store to get rid of the cached objects. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_PLASMA_STORE][1] + process.kill() + process.wait() + + self.assertEqual(ray.get(actor.inc.remote()), 101) + # Each inc method has been reexecuted once on the new actor, since all + # checkpoint saves failed. + self.assertEqual(ray.get(actor.get_num_inc_calls.remote()), 101) + # Get all of the results that were previously lost. Because the + # checkpoints were lost, all methods should be reconstructed. + results = ray.get(ids) + self.assertEqual(results, list(range(1, 1 + len(results)))) + + errors = ray.error_info() + # We submitted 101 tasks with a checkpoint interval of 5. + num_checkpoints = 101 // 5 + # Each checkpoint task throws an exception when saving during initial + # execution, and then again during re-execution. + self.assertEqual(len([error for error in errors if error[b"type"] == + b"task"]), num_checkpoints * 2) + + ray.worker.cleanup() + + def testCheckpointResumeException(self): + actor, ids = self.setup_test_checkpointing(resume_exception=True) + # Wait for the last task to finish running. + ray.get(ids[-1]) + + # Kill the corresponding plasma store to get rid of the cached objects. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_PLASMA_STORE][1] + process.kill() + process.wait() + + self.assertEqual(ray.get(actor.inc.remote()), 101) + # Each inc method has been reexecuted once on the new actor, since all + # checkpoint resumes failed. + self.assertEqual(ray.get(actor.get_num_inc_calls.remote()), 101) + # Get all of the results that were previously lost. Because the + # checkpoints were lost, all methods should be reconstructed. + results = ray.get(ids) + self.assertEqual(results, list(range(1, 1 + len(results)))) + + errors = ray.error_info() + # The most recently executed checkpoint task should throw an exception + # when trying to resume. All other checkpoint tasks should reconstruct + # the previous task but throw no errors. + self.assertEqual(len([error for error in errors if error[b"type"] == + b"task"]), 1) + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2)