Skip to content

Commit

Permalink
[core] Handle out-of-order actor table notifications (#9449)
Browse files Browse the repository at this point in the history
* Drop stale actor table notifications

* build

* Add num_restarts to disconnect handler

* Unit test and increment num_restarts on ALIVE, not RESTARTING

* Wait for pid to exit
  • Loading branch information
stephanie-wang authored Jul 15, 2020
1 parent ccc1133 commit 6d99aa3
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 59 deletions.
4 changes: 4 additions & 0 deletions python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def get_pid(self):
results = [actor.increase.remote() for _ in range(100)]
# Kill actor process, while the above task is still being executed.
os.kill(pid, SIGKILL)
wait_for_pid_to_exit(pid)
# Make sure that all tasks were executed in order before the actor's death.
res = results.pop(0)
i = 1
Expand Down Expand Up @@ -208,6 +209,7 @@ def get_pid(self):
results = [actor.increase.remote() for _ in range(100)]
pid = ray.get(actor.get_pid.remote())
os.kill(pid, SIGKILL)
wait_for_pid_to_exit(pid)
# The actor has exceeded max restarts, and this task should fail.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
Expand Down Expand Up @@ -244,6 +246,7 @@ def get_pid(self):
results = [actor.increase.remote() for _ in range(100)]
# Kill actor process, while the above task is still being executed.
os.kill(pid, SIGKILL)
wait_for_pid_to_exit(pid)
# Check that none of the tasks failed and the actor is restarted.
seq = list(range(1, 101))
results = ray.get(results)
Expand All @@ -264,6 +267,7 @@ def get_pid(self):
results = [actor.increase.remote() for _ in range(100)]
pid = ray.get(actor.get_pid.remote())
os.kill(pid, SIGKILL)
wait_for_pid_to_exit(pid)
# The actor has exceeded max restarts, and this task should fail.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.increase.remote())
Expand Down
26 changes: 14 additions & 12 deletions src/ray/core_worker/actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,27 +153,29 @@ void ActorManager::WaitForActorOutOfScope(

void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
<< ", raylet_id: "
<< ClientID::FromBinary(actor_data.address().raylet_id())
<< ", num_restarts: " << actor_data.num_restarts();

if (actor_data.state() == gcs::ActorTableData::PENDING) {
// The actor is being created and not yet ready, just ignore!
} else if (actor_data.state() == gcs::ActorTableData::RESTARTING) {
direct_actor_submitter_->DisconnectActor(actor_id, false);
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false);
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
direct_actor_submitter_->DisconnectActor(actor_id, true);
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), true);
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.
} else {
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address());
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address(),
actor_data.num_restarts());
}

const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
<< ", raylet_id: "
<< ClientID::FromBinary(actor_data.address().raylet_id());
}

std::vector<ObjectID> ActorManager::GetActorHandleIDsFromHandles() {
Expand Down
26 changes: 14 additions & 12 deletions src/ray/core_worker/test/actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterf
MockDirectActorSubmitter() : CoreWorkerDirectActorTaskSubmitterInterface() {}

MOCK_METHOD1(AddActorQueueIfNotExists, void(const ActorID &actor_id));
MOCK_METHOD2(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address));
MOCK_METHOD2(DisconnectActor, void(const ActorID &actor_id, bool dead));
MOCK_METHOD3(ConnectActor, void(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts));
MOCK_METHOD3(DisconnectActor,
void(const ActorID &actor_id, int64_t num_restarts, bool dead));
MOCK_METHOD3(KillActor,
void(const ActorID &actor_id, bool force_kill, bool no_restart));

Expand Down Expand Up @@ -189,15 +191,15 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) {
ASSERT_TRUE(actor_handle_to_get->GetActorID() == actor_id);

// Check after the actor is created, if it is connected to an actor.
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(1);
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(1);
rpc::ActorTableData actor_table_data;
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(
rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE);
actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data);

// Now actor state is updated to DEAD. Make sure it is diconnected.
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1);
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(
rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD);
Expand Down Expand Up @@ -242,8 +244,8 @@ TEST_F(ActorManagerTest, RegisterActorHandles) {
TEST_F(ActorManagerTest, TestActorStateNotificationPending) {
ActorID actor_id = AddActorHandle();
// Nothing happens if state is pending.
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(0);
rpc::ActorTableData actor_table_data;
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(
Expand All @@ -255,8 +257,8 @@ TEST_F(ActorManagerTest, TestActorStateNotificationPending) {
TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) {
ActorID actor_id = AddActorHandle();
// Should disconnect to an actor when actor is restarting.
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1);
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1);
rpc::ActorTableData actor_table_data;
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(
Expand All @@ -268,8 +270,8 @@ TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) {
TEST_F(ActorManagerTest, TestActorStateNotificationDead) {
ActorID actor_id = AddActorHandle();
// Should disconnect to an actor when actor is dead.
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(1);
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1);
rpc::ActorTableData actor_table_data;
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(
Expand All @@ -281,8 +283,8 @@ TEST_F(ActorManagerTest, TestActorStateNotificationDead) {
TEST_F(ActorManagerTest, TestActorStateNotificationAlive) {
ActorID actor_id = AddActorHandle();
// Should connect to an actor when actor is alive.
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _)).Times(1);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _)).Times(0);
EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(1);
EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(0);
rpc::ActorTableData actor_table_data;
actor_table_data.set_actor_id(actor_id.Binary());
actor_table_data.set_state(
Expand Down
100 changes: 85 additions & 15 deletions src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,14 @@ class DirectActorSubmitterTest : public ::testing::Test {
: worker_client_(std::shared_ptr<MockWorkerClient>(new MockWorkerClient())),
store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore())),
task_finisher_(std::make_shared<MockTaskFinisher>()),
submitter_([&](const rpc::Address &addr) { return worker_client_; }, store_,
task_finisher_) {}

submitter_(
[&](const rpc::Address &addr) {
num_clients_connected_++;
return worker_client_;
},
store_, task_finisher_) {}

int num_clients_connected_ = 0;
std::shared_ptr<MockWorkerClient> worker_client_;
std::shared_ptr<CoreWorkerMemoryStore> store_;
std::shared_ptr<MockTaskFinisher> task_finisher_;
Expand All @@ -123,7 +128,7 @@ TEST_F(DirectActorSubmitterTest, TestSubmitTask) {
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 0);

submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 1);

task = CreateActorTaskHelper(actor_id, worker_id, 1);
Expand All @@ -145,7 +150,7 @@ TEST_F(DirectActorSubmitterTest, TestDependencies) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);

// Create two tasks for the actor with different arguments.
Expand Down Expand Up @@ -179,7 +184,7 @@ TEST_F(DirectActorSubmitterTest, TestOutOfOrderDependencies) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);

// Create two tasks for the actor with different arguments.
Expand Down Expand Up @@ -215,7 +220,7 @@ TEST_F(DirectActorSubmitterTest, TestActorDead) {
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);

// Create two tasks for the actor. One depends on an object that is not yet available.
Expand All @@ -235,10 +240,10 @@ TEST_F(DirectActorSubmitterTest, TestActorDead) {
}

EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(0);
submitter_.DisconnectActor(actor_id, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 0, /*dead=*/false);
// Actor marked as dead. All queued tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)).Times(1);
submitter_.DisconnectActor(actor_id, /*dead=*/true);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/true);
}

TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) {
Expand All @@ -248,7 +253,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) {
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);

// Create four tasks for the actor.
Expand All @@ -268,13 +273,13 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));

// Simulate the actor failing.
submitter_.DisconnectActor(actor_id, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 0, /*dead=*/false);
// Third task fails after the actor is disconnected. It should not get
// retried.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));

// Actor gets restarted.
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 1);
ASSERT_TRUE(submitter_.SubmitTask(task4).ok());
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
ASSERT_TRUE(worker_client_->callbacks.empty());
Expand All @@ -289,7 +294,7 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) {
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);

// Create four tasks for the actor.
Expand All @@ -313,12 +318,12 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));

// Simulate the actor failing.
submitter_.DisconnectActor(actor_id, /*dead=*/false);
submitter_.DisconnectActor(actor_id, 0, /*dead=*/false);
// Third task fails after the actor is disconnected.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));

// Actor gets restarted.
submitter_.ConnectActor(actor_id, addr);
submitter_.ConnectActor(actor_id, addr, 1);
// A new task is submitted.
ASSERT_TRUE(submitter_.SubmitTask(task4).ok());
// Tasks 2 and 3 get retried.
Expand All @@ -332,6 +337,71 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) {
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 2, 0, 1));
}

TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
ASSERT_EQ(num_clients_connected_, 1);

// Create four tasks for the actor.
auto task = CreateActorTaskHelper(actor_id, worker_id, 0);
// Submit a task.
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));

// Actor restarts, but we don't receive the disconnect message until later.
submitter_.ConnectActor(actor_id, addr, 1);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));

// We receive the RESTART message late. Nothing happens.
submitter_.DisconnectActor(actor_id, 0, /*dead=*/false);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 2);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));

// The actor dies twice. We receive the last RESTART message first.
submitter_.DisconnectActor(actor_id, 2, /*dead=*/false);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 3);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
EXPECT_CALL(*task_finisher_, CompletePendingTask(task.TaskId(), _, _)).Times(0);
ASSERT_FALSE(worker_client_->ReplyPushTask(Status::OK()));

// We receive the late messages. Nothing happens.
submitter_.ConnectActor(actor_id, addr, 2);
submitter_.DisconnectActor(actor_id, 1, /*dead=*/false);
ASSERT_EQ(num_clients_connected_, 2);

// The actor dies permanently. All tasks are failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _)).Times(1);
submitter_.DisconnectActor(actor_id, 2, /*dead=*/true);
ASSERT_EQ(num_clients_connected_, 2);

// We receive more late messages. Nothing happens because the actor is dead.
submitter_.DisconnectActor(actor_id, 3, /*dead=*/false);
submitter_.ConnectActor(actor_id, addr, 3);
ASSERT_EQ(num_clients_connected_, 2);
// Submit a task.
task = CreateActorTaskHelper(actor_id, worker_id, 4);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task.TaskId(), _, _)).Times(1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
}

class MockDependencyWaiter : public DependencyWaiter {
public:
MOCK_METHOD2(Wait, void(const std::vector<rpc::ObjectReference> &dependencies,
Expand Down
Loading

0 comments on commit 6d99aa3

Please sign in to comment.