Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/local_scheduler/format/local_scheduler.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ enum MessageType:int {
// Send a reply confirming the successful registration of a worker or driver.
// This is sent from the local scheduler to a worker or driver.
RegisterClientReply,
// Notify the local scheduler that this client is disconnecting gracefully.
// Notify the local scheduler that this client disconnected unexpectedly.
// This is sent from a worker to a local scheduler.
DisconnectClient,
// Notify the local scheduler that this client is disconnecting gracefully.
// This is sent from a worker to a local scheduler.
IntentionalDisconnectClient,
// Get a new task from the local scheduler. This is sent from a worker to a
// local scheduler.
GetTask,
Expand Down
11 changes: 9 additions & 2 deletions src/local_scheduler/local_scheduler_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,15 @@ void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::local_scheduler::protocol::CreateDisconnectClient(fbb);
fbb.Finish(message);
write_message(conn->conn, static_cast<int64_t>(MessageType::DisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
if (conn->use_raylet) {
write_message(conn->conn, static_cast<int64_t>(
MessageType::IntentionalDisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
} else {
write_message(conn->conn,
static_cast<int64_t>(MessageType::DisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
}

void local_scheduler_log_event(LocalSchedulerConnection *conn,
Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ enum MessageType:int {
// Send a reply confirming the successful registration of a worker or driver.
// This is sent from the local scheduler to a worker or driver.
RegisterClientReply,
// Notify the local scheduler that this client is disconnecting gracefully.
// Notify the local scheduler that this client is disconnecting unexpectedly.
// This is sent from a worker to a local scheduler.
DisconnectClient,
// Notify the local scheduler that this client is disconnecting gracefully.
// This is sent from a worker to a local scheduler.
IntentionalDisconnectClient,
// Get a new task from the local scheduler. This is sent from a worker to a
// local scheduler.
GetTask,
Expand Down Expand Up @@ -183,7 +186,7 @@ table PushErrorRequest {
}

table FreeObjectsRequest {
// Whether keep this request with local object store
// Whether keep this request with local object store
// or send it to all the object stores.
local_only: bool;
// List of object ids we'll delete from object store.
Expand Down
34 changes: 23 additions & 11 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ RAY_CHECK_ENUM(protocol::MessageType::RegisterClientReply,
local_scheduler_protocol::MessageType::RegisterClientReply);
RAY_CHECK_ENUM(protocol::MessageType::DisconnectClient,
local_scheduler_protocol::MessageType::DisconnectClient);
RAY_CHECK_ENUM(protocol::MessageType::IntentionalDisconnectClient,
local_scheduler_protocol::MessageType::IntentionalDisconnectClient);
RAY_CHECK_ENUM(protocol::MessageType::GetTask,
local_scheduler_protocol::MessageType::GetTask);
RAY_CHECK_ENUM(protocol::MessageType::ExecuteTask,
Expand Down Expand Up @@ -539,18 +541,19 @@ void NodeManager::ProcessClientMessage(
RAY_LOG(DEBUG) << "Message of type " << message_type;

auto registered_worker = worker_pool_.GetRegisteredWorker(client);
auto message_type_value = static_cast<protocol::MessageType>(message_type);
if (registered_worker && registered_worker->IsDead()) {
// For a worker that is marked as dead (because the driver has died already),
// all the messages are ignored except DisconnectClient.
if (static_cast<protocol::MessageType>(message_type) !=
protocol::MessageType::DisconnectClient) {
if ((message_type_value != protocol::MessageType::DisconnectClient) &&
(message_type_value != protocol::MessageType::IntentionalDisconnectClient)) {
// Listen for more messages.
client->ProcessMessages();
return;
}
}

switch (static_cast<protocol::MessageType>(message_type)) {
switch (message_type_value) {
case protocol::MessageType::RegisterClientRequest: {
ProcessRegisterClientRequestMessage(client, message_data);
} break;
Expand All @@ -563,6 +566,12 @@ void NodeManager::ProcessClientMessage(
// because it's already disconnected.
return;
} break;
case protocol::MessageType::IntentionalDisconnectClient: {
ProcessDisconnectClientMessage(client, /* push_warning = */ false);
// We don't need to receive future messages from this client,
// because it's already disconnected.
return;
} break;
case protocol::MessageType::SubmitTask: {
ProcessSubmitTaskMessage(message_data);
} break;
Expand Down Expand Up @@ -638,7 +647,7 @@ void NodeManager::ProcessGetTaskMessage(
}

void NodeManager::ProcessDisconnectClientMessage(
const std::shared_ptr<LocalClientConnection> &client) {
const std::shared_ptr<LocalClientConnection> &client, bool push_warning) {
const std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);

This comment was marked as resolved.

const std::shared_ptr<Worker> driver = worker_pool_.GetRegisteredDriver(client);
// This client can't be a worker and a driver.
Expand Down Expand Up @@ -678,13 +687,16 @@ void NodeManager::ProcessDisconnectClientMessage(
TreatTaskAsFailed(spec);

const JobID &job_id = worker->GetAssignedDriverId();
// TODO(rkn): Define this constant somewhere else.
std::string type = "worker_died";
std::ostringstream error_message;
error_message << "A worker died or was killed while executing task " << task_id
<< ".";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
job_id, type, error_message.str(), current_time_ms()));

if (push_warning) {
// TODO(rkn): Define this constant somewhere else.
std::string type = "worker_died";
std::ostringstream error_message;
error_message << "A worker died or was killed while executing task " << task_id
<< ".";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
job_id, type, error_message.str(), current_time_ms()));
}
}

worker_pool_.DisconnectWorker(worker);
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ class NodeManager {
/// client.
///
/// \param client The client that sent the message.
/// \param push_warning Propogate error message if true.
/// \return Void.
void ProcessDisconnectClientMessage(
const std::shared_ptr<LocalClientConnection> &client);
const std::shared_ptr<LocalClientConnection> &client, bool push_warning = true);

/// Process client message of SubmitTask
///
Expand Down
13 changes: 13 additions & 0 deletions test/failure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,19 @@ def getpid(self):
ray.get(task2)


def test_actor_scope_or_intentionally_killed_message(ray_start_regular):
@ray.remote
class Actor(object):
pass

a = Actor.remote()
a = Actor.remote()
a.__ray_terminate__.remote()
time.sleep(1)
assert len(ray.error_info()) == 0, (
"Should not have propogated an error - {}".format(ray.error_info()))


@pytest.fixture
def ray_start_object_store_memory():
# Start the Ray processes.
Expand Down