Skip to content

Commit

Permalink
Drop stale actor table notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang committed Jul 11, 2020
1 parent ee60de7 commit 193c5d2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 20 deletions.
22 changes: 12 additions & 10 deletions src/ray/core_worker/actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ void ActorManager::WaitForActorOutOfScope(

void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
<< ", raylet_id: "
<< ClientID::FromBinary(actor_data.address().raylet_id())
<< ", num_restarts: " << actor_data.num_restarts();

if (actor_data.state() == gcs::ActorTableData::PENDING) {
// The actor is being created and not yet ready, just ignore!
} else if (actor_data.state() == gcs::ActorTableData::RESTARTING) {
Expand All @@ -163,17 +173,9 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.
} else {
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address());
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address(),
actor_data.num_restarts());
}

const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
<< ", raylet_id: "
<< ClientID::FromBinary(actor_data.address().raylet_id());
}

std::vector<ObjectID> ActorManager::GetActorHandleIDsFromHandles() {
Expand Down
27 changes: 19 additions & 8 deletions src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,31 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
return Status::OK();
}

void CoreWorkerDirectActorTaskSubmitter::DisconnectRpcClient(ClientQueue &queue) {
queue.rpc_client = nullptr;
queue.worker_id.clear();
queue.pending_force_kill.reset();
}

void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
const rpc::Address &address) {
const rpc::Address &address,
int64_t num_restarts) {
RAY_LOG(DEBUG) << "Connecting to actor " << actor_id << " at worker "
<< WorkerID::FromBinary(address.worker_id());
absl::MutexLock lock(&mu_);

auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
if (queue->second.rpc_client) {
// Skip reconnection if we already have a client to this actor.
// NOTE(swang): This seems to only trigger in multithreaded Java tests.
RAY_CHECK(queue->second.worker_id == address.worker_id());
if (num_restarts <= queue->second.num_restarts) {
// This message is about an old version of the actor. Skip the connection.
return;
}

if (queue->second.rpc_client) {
// Clear the client to the old version of the actor.
DisconnectRpcClient(queue->second);
}

queue->second.state = rpc::ActorTableData::ALIVE;
// Update the mapping so new RPCs go out with the right intended worker id.
queue->second.worker_id = address.worker_id();
Expand All @@ -143,6 +155,7 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,

void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
bool dead) {
RAY_LOG(DEBUG) << "Disconnecting from actor " << actor_id;
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
Expand All @@ -156,9 +169,7 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id
// The actor failed, so erase the client for now. Either the actor is
// permanently dead or the new client will be inserted once the actor is
// restarted.
queue->second.rpc_client = nullptr;
queue->second.worker_id.clear();
queue->second.pending_force_kill.reset();
DisconnectRpcClient(queue->second);

// If there are pending requests, treat the pending tasks as failed.
if (dead) {
Expand Down
16 changes: 14 additions & 2 deletions src/ray/core_worker/transport/direct_actor_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const int kMaxReorderWaitSeconds = 30;
class CoreWorkerDirectActorTaskSubmitterInterface {
public:
virtual void AddActorQueueIfNotExists(const ActorID &actor_id) = 0;
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address) = 0;
virtual void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts) = 0;
virtual void DisconnectActor(const ActorID &actor_id, bool dead = false) = 0;
virtual void KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) = 0;

Expand Down Expand Up @@ -95,7 +96,11 @@ class CoreWorkerDirectActorTaskSubmitter
///
/// \param[in] actor_id Actor ID.
/// \param[in] address The new address of the actor.
void ConnectActor(const ActorID &actor_id, const rpc::Address &address);
/// \param[in] num_restarts How many times this actor was alive
/// before. If we've already seen a later incarnation of the actor,
/// we will ignore the command to connect.
void ConnectActor(const ActorID &actor_id, const rpc::Address &address,
int64_t num_restarts);

/// Disconnect from a failed actor.
///
Expand All @@ -111,6 +116,10 @@ class CoreWorkerDirectActorTaskSubmitter
/// an RPC client to the actor. If this is DEAD, then all tasks in the
/// queue will be marked failed and all other ClientQueue state is ignored.
rpc::ActorTableData::ActorState state = rpc::ActorTableData::PENDING;
/// How many times this actor has restarted so far. Starts at -1 to
/// indicate that the actor is not yet created. This is used to drop stale
/// messages from the GCS.
int64_t num_restarts = -1;
/// The RPC client. We use shared_ptr to enable shared_from_this for
/// pending client callbacks.
std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client = nullptr;
Expand Down Expand Up @@ -193,6 +202,9 @@ class CoreWorkerDirectActorTaskSubmitter
/// \return Void.
void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Disconnect the RPC client for an actor.
void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Whether the specified actor is alive.
///
/// \param[in] actor_id The actor ID.
Expand Down

0 comments on commit 193c5d2

Please sign in to comment.