Skip to content

Commit

Permalink
Fix failure handling for actor death (#3359)
Browse files Browse the repository at this point in the history
* Broadcast actor death, clean up dummy objects

* Reduce logging and clean up state when failing a task

* lint

* Make actor failure test nicer, reduce node timeout
  • Loading branch information
stephanie-wang authored Nov 21, 2018
1 parent 1a926c9 commit 3e33f6f
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 119 deletions.
9 changes: 9 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ table TaskTableTestAndUpdate {
table ClassTableData {
}

enum ActorState:int {
// Actor is alive.
ALIVE = 0,
// Actor is already dead and won't be reconstructed.
DEAD
}

table ActorTableData {
// The ID of the actor that was created.
actor_id: string;
Expand All @@ -175,6 +182,8 @@ table ActorTableData {
driver_id: string;
// The ID of the node manager that created the actor.
node_manager_id: string;
// Current state of this actor.
state: ActorState;
}

table ErrorTableData {
Expand Down
7 changes: 4 additions & 3 deletions src/ray/raylet/actor_registration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id,
frontier_entry.task_counter++;
frontier_entry.execution_dependency = execution_dependency;
execution_dependency_ = execution_dependency;
dummy_objects_.push_back(execution_dependency);
}

bool ActorRegistration::IsAlive() const { return alive_; }

void ActorRegistration::MarkDead() { alive_ = false; }
bool ActorRegistration::IsAlive() const {
return actor_table_data_.state == ActorState::ALIVE;
}

std::string ActorRegistration::DebugString() const {
std::stringstream result;
Expand Down
19 changes: 15 additions & 4 deletions src/ray/raylet/actor_registration.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ class ActorRegistration {
ObjectID execution_dependency;
};

/// Get the actor table data.
///
/// \return The actor table data.
const ActorTableDataT &GetTableData() const { return actor_table_data_; }

/// Get the actor's current state (ALIVE or DEAD).
///
/// \return The actor's current state.
const ActorState &GetState() const { return actor_table_data_.state; }

/// Get the actor's node manager location.
///
/// \return The actor's node manager location. All tasks for the actor should
Expand Down Expand Up @@ -66,6 +76,9 @@ class ActorRegistration {
/// that handle.
const std::unordered_map<ActorHandleID, FrontierLeaf> &GetFrontier() const;

/// Get all the dummy objects of this actor's tasks.
const std::vector<ObjectID> &GetDummyObjects() const { return dummy_objects_; }

/// Extend the frontier of the actor by a single task. This should be called
/// whenever the actor executes a task.
///
Expand All @@ -81,10 +94,6 @@ class ActorRegistration {
/// \return True if the local actor is alive and false if it is dead.
bool IsAlive() const;

/// Mark the actor as dead.
/// \return Void.
void MarkDead();

/// Returns debug string for class.
///
/// \return string.
Expand All @@ -104,6 +113,8 @@ class ActorRegistration {
/// executed so far and which tasks may execute next, based on execution
/// dependencies. This is indexed by handle.
std::unordered_map<ActorHandleID, FrontierLeaf> frontier_;
/// All of the dummy object IDs from this actor's tasks.
std::vector<ObjectID> dummy_objects_;
};

} // namespace raylet
Expand Down
250 changes: 160 additions & 90 deletions src/ray/raylet/node_manager.cc

Large diffs are not rendered by default.

30 changes: 17 additions & 13 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,17 @@ class NodeManager {
/// \param task The task in question.
/// \return Void.
void EnqueuePlaceableTask(const Task &task);
/// This will treat the task as if it had been executed and failed. This is
/// done by looping over the task return IDs and for each ID storing an object
/// that represents a failure in the object store. When clients retrieve these
/// objects, they will raise application-level exceptions.
/// This will treat a task removed from the local queue as if it had been
/// executed and failed. This is done by looping over the task return IDs and
/// for each ID storing an object that represents a failure in the object
/// store. When clients retrieve these objects, they will raise
/// application-level exceptions. State for the task will be cleaned up as if
/// it were any other task that had been assigned, executed, and removed from
/// the local queue.
///
/// \param spec The specification of the task.
/// \return Void.
void TreatTaskAsFailed(const TaskSpecification &spec);
void TreatTaskAsFailed(const Task &task);
/// Handle specified task's submission to the local node manager.
///
/// \param task The task being submitted.
Expand Down Expand Up @@ -258,20 +261,21 @@ class NodeManager {
void KillWorker(std::shared_ptr<Worker> worker);

/// Methods for actor scheduling.
/// Handler for the creation of an actor, possibly on a remote node.
/// Handler for an actor state transition, for a newly created actor or an
/// actor that died. This method is idempotent and will ignore old state
/// transitions.
///
/// \param actor_id The actor ID of the actor that was created.
/// \param data Data associated with the actor creation event.
/// \param data Data associated with the actor state transition.
/// \return Void.
void HandleActorCreation(const ActorID &actor_id,
const std::vector<ActorTableDataT> &data);
void HandleActorStateTransition(const ActorID &actor_id, const ActorTableDataT &data);

/// When an actor dies, loop over all of the queued tasks for that actor and
/// treat them as failed.
/// Handler for an actor dying. The actor may be remote.
///
/// \param actor_id The actor that died.
/// \param actor_id The actor ID of the actor that died.
/// \param was_local Whether the actor was local.
/// \return Void.
void CleanUpTasksForDeadActor(const ActorID &actor_id);
void HandleDisconnectedActor(const ActorID &actor_id, bool was_local);

/// When a driver dies, loop over all of the queued tasks for that driver and
/// treat them as failed.
Expand Down
8 changes: 6 additions & 2 deletions src/ray/raylet/task_dependency_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ bool TaskDependencyManager::SubscribeDependencies(
return (task_entry.num_missing_dependencies == 0);
}

void TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) {
bool TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) {
// Remove the task from the table of subscribed tasks.
auto it = task_dependencies_.find(task_id);
RAY_CHECK(it != task_dependencies_.end());
if (it == task_dependencies_.end()) {
return false;
}

const TaskDependencies task_entry = std::move(it->second);
task_dependencies_.erase(it);
Expand Down Expand Up @@ -206,6 +208,8 @@ void TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) {
for (const auto &object_id : task_entry.object_dependencies) {
HandleRemoteDependencyCanceled(object_id);
}

return true;
}

std::vector<TaskID> TaskDependencyManager::GetPendingTasks() const {
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/task_dependency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class TaskDependencyManager {
/// then they will be canceled.
///
/// \param task_id The ID of the task whose dependencies to unsubscribe from.
void UnsubscribeDependencies(const TaskID &task_id);
/// \return Whether the task was subscribed before.
bool UnsubscribeDependencies(const TaskID &task_id);

/// Mark that the given task is pending execution. Any objects that it creates
/// are now considered to be pending creation. If there are any subscribed
Expand Down
17 changes: 12 additions & 5 deletions test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,14 @@ def blocking_method(self):


def test_exception_raised_when_actor_node_dies(shutdown_only):
ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_cpus=1)
ray.worker._init(
start_ray_local=True,
num_local_schedulers=2,
num_cpus=1,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
}))

@ray.remote
class Counter(object):
Expand All @@ -1287,11 +1294,11 @@ def inc(self):
ray.services.PROCESS_TYPE_PLASMA_STORE][1]
process.kill()

# Submit some new actor tasks.
x_ids = [actor.inc.remote() for _ in range(5)]

# Make sure that getting the result raises an exception.
# Submit some new actor tasks both before and after the node failure is
# detected. Make sure that getting the result raises an exception.
for _ in range(10):
# Submit some new actor tasks.
x_ids = [actor.inc.remote() for _ in range(5)]
for x_id in x_ids:
with pytest.raises(ray.worker.RayGetError):
# There is some small chance that ray.get will actually
Expand Down
7 changes: 6 additions & 1 deletion test/component_failures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function

import os
import json
import signal
import time

Expand Down Expand Up @@ -262,7 +263,11 @@ def _test_component_failed(component_type):
num_local_schedulers=num_local_schedulers,
start_ray_local=True,
num_cpus=[num_workers_per_scheduler] * num_local_schedulers,
redirect_output=True)
redirect_output=True,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 1000,
"num_heartbeats_timeout": 10,
}))

# Submit many tasks with many dependencies.
@ray.remote
Expand Down

0 comments on commit 3e33f6f

Please sign in to comment.