Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add common preprocessing for each request in node manager. #5296

Merged
merged 23 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ray/protobuf/raylet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package ray.rpc;
import "src/ray/protobuf/common.proto";
import "src/ray/protobuf/gcs.proto";

/// NOTE(Joey Jiang) Every request defined in this file should add worker_id field, which
/// helps the raylet to filter messages from a dead worker.
/// NOTE(Joey Jiang) Every request defined in this file should have a `worker_id` field, which
/// will be used in `NodeManager::PreprocessRequest`.

/// Service request and reply messages.
message RegisterClientRequest {
Expand Down
77 changes: 29 additions & 48 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ namespace {
#define RAY_CHECK_ENUM(x, y) \
static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")

/// Macro to handle early return for preprocessing.
/// An early return will take place if the worker is being killed due to the exitting of
/// driver, or the worker is not registered yet.
#define PREPROCESS_REQUEST(REQUEST_TYPE, REQUEST, SEND_REPLY) \
WorkerID worker_id = WorkerID::FromBinary(REQUEST.worker_id()); \
jiangzihao2009 marked this conversation as resolved.
Show resolved Hide resolved
do { \
if (!PreprocessRequest(worker_id, #REQUEST_TYPE)) { \
jiangzihao2009 marked this conversation as resolved.
Show resolved Hide resolved
SEND_REPLY( \
Status::Invalid("Discard this request due to failure of preprocessing."), \
nullptr, nullptr); \
return; \
} \
} while (0)

/// A helper function to return the expected actor counter for a given actor
/// and actor handle, according to the given actor registry. If a task's
/// counter is less than the returned value, then the task is a duplicate. If
Expand Down Expand Up @@ -739,77 +753,44 @@ void NodeManager::DispatchTasks(
local_queues_.MoveTasks(assigned_task_ids, TaskState::READY, TaskState::RUNNING);
}

std::pair<std::shared_ptr<Worker>, bool> NodeManager::GetWorker(
const WorkerID &worker_id) {
bool is_worker = true;
auto worker = worker_pool_.GetRegisteredWorker(worker_id);
if (!worker) {
is_worker = false;
worker = worker_pool_.GetRegisteredDriver(worker_id);
if (!worker) {
return std::make_pair(nullptr, true);
}
}
return std::make_pair(worker, is_worker);
}

#define PREPROCESS_REQUEST(REQUEST_TYPE, REQUEST, SEND_REPLY) \
WorkerID worker_id = WorkerID::FromBinary(REQUEST.worker_id()); \
do { \
if (!PreprocessRequest<rpc::REQUEST_TYPE>(worker_id, #REQUEST_TYPE)) { \
SEND_REPLY( \
Status::Invalid("Discard this request due to failure of preprocessing."), \
nullptr, nullptr); \
return; \
} \
} while (0)

template <typename Request>
bool NodeManager::PreprocessRequest(const WorkerID &worker_id,
const std::string &request_name) {
std::ostringstream oss;
if (RAY_LOG_ENABLED(DEBUG)) {
oss << "Received a " << request_name << " request. Worker id " << worker_id << ".";
}
// NOTE(Joey Jiang): We check heartbeat actively because the heartbeats' callback won't
// be invoked until all the posted requests in the io_service have been finished.
// NOTE(Joey Jiang): We check heartbeat proactively because the heartbeats' callback
// won't be invoked until all the posted requests in the io_service have been finished.
// The raylet might be marked as dead due to missing heartbeats when too many requests
// are sent to this raylet.
int64_t expiry_delay =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - heartbeat_timer_.expires_at())
.count();
int64_t expiry_delay_limit = RayConfig::instance().heartbeat_timeout_milliseconds() *
RayConfig::instance().num_heartbeats_timeout() / 2;

// Timer's callback hasn't been called on the desired time.
if (expiry_delay > expiry_delay_limit) {
if (expiry_delay > RayConfig::instance().heartbeat_timeout_milliseconds()) {
heartbeat_timer_.cancel();
RAY_LOG(WARNING)
<< "Timer has been cancelled. Maybe the timer was delayed due to "
"lots of post events and we have reset the heartbeat. Expiry delay "
<< expiry_delay << ", delay limit " << expiry_delay_limit;
Heartbeat();
jiangzihao2009 marked this conversation as resolved.
Show resolved Hide resolved
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check the timer in HandleForwardTaskRequest as well. maybe move this to a CheckHeartbeatTimer function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically we should add the check for all the messages in raylet? node manager, object manager, and from worker to raylet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but except object manager requests, which are handled in background threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we also need to check the timer in HandleForwardTaskRequest? The heartbeats are only for workers, not for remote node/object managers, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang because if raylet suddenly receives a huge number of ForwardTask requests, raylet-to-gcs heartbeat will time out as well.

jiangzihao2009 marked this conversation as resolved.
Show resolved Hide resolved

auto rt = GetWorker(worker_id);
auto worker = rt.first;
auto worker = worker_pool_.GetWorker(worker_id);
// Worker process has been killed, we should discard this request.
if (!worker) {
RAY_LOG(WARNING) << "Worker " << worker_id
<< " is not found in worker pool, request will be discarded.";
RAY_LOG(WARNING) << "Worker " << worker_id << " is not found in worker pool, request "
<< request_name << " will be discarded.";
return false;
}
if (RAY_LOG_ENABLED(DEBUG)) {
oss << " Is worker: " << (rt.second ? "true" : "false") << ". Worker pid "
oss << " Is worker: " << (worker->IsWorker() ? "true" : "false") << ". Worker pid "
<< std::to_string(worker->Pid()) << ".";
RAY_LOG(DEBUG) << oss.str();
}

// The worker process is being killing, we should discard this request.
if (worker->IsBeingKilled()) {
RAY_LOG(INFO) << "Worker " << worker_id
<< " is being killed, request will be discarded.";
RAY_LOG(INFO) << "Worker " << worker_id << " is being killed, request "
<< request_name << " will be discarded.";
return false;
}

Expand All @@ -821,15 +802,16 @@ void NodeManager::HandleRegisterClientRequest(
rpc::SendReplyCallback send_reply_callback) {
// Client id in register client is treated as worker id.
const WorkerID worker_id = WorkerID::FromBinary(request.worker_id());
bool is_worker = request.is_worker();
auto worker =
std::make_shared<Worker>(worker_id, request.worker_pid(), request.language(),
request.port(), client_call_manager_);
request.port(), client_call_manager_, is_worker);

RAY_LOG(DEBUG) << "Received a RegisterClientRequest. Worker id: " << worker_id
<< ". Is worker: " << request.is_worker() << ". Worker pid "
<< ". Is worker: " << is_worker << ". Worker pid "
<< request.worker_pid();

if (request.is_worker()) {
if (is_worker) {
// Register the new worker.
bool use_push_task = worker->UsePush();
worker_pool_.RegisterWorker(worker_id, std::move(worker));
Expand Down Expand Up @@ -928,14 +910,13 @@ void NodeManager::HandleDisconnectClientRequest(

void NodeManager::ProcessDisconnectClientMessage(const WorkerID &worker_id,
bool intentional_disconnect) {
auto rt = GetWorker(worker_id);
auto worker = rt.first;
auto worker = worker_pool_.GetWorker(worker_id);
if (!worker) {
RAY_LOG(INFO) << "Ignoring client disconnect because the client has already "
<< "been disconnected.";
return;
}
bool is_worker = rt.second;
bool is_worker = worker->IsWorker();

// If the client has any blocked tasks, mark them as unblocked. In
// particular, we are no longer waiting for their dependencies.
Expand Down
17 changes: 5 additions & 12 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Get the port of the node manager rpc server.
int GetServerPort() const { return node_manager_server_.GetPort(); }

/// Preprocess request from raylet client. This takes care of common processing for each
/// request, such as logging, checking heartbeat and whether worker is being killed.
/// Preprocess request from raylet client. We will check whether the worker is being
/// killed due to exitting of driver. Also we will send the heartbeat to monitor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo exiting

/// proactively if the callback of timer is not trigger at the desired time.
///
/// \param Request Request template type.
/// \param worker_id The worker id.
/// \param request_name The request name.
/// \param request The request message.
template <typename Request>
/// \return False if there is no need to process this request.
bool PreprocessRequest(const WorkerID &worker_id, const std::string &request_name);
jiangzihao2009 marked this conversation as resolved.
Show resolved Hide resolved

/// Implementation of node manager grpc service.

/// Handle a `ForwardTask` request.
Expand Down Expand Up @@ -551,13 +551,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
void HandleDisconnectedActor(const ActorID &actor_id, bool was_local,
bool intentional_disconnect);

/// Get the pointer to a worker object.
///
/// \param worker_id The worker id.
/// \return The pair contains the pointer to the worker and the flat that indicates
/// whether it's a driver.
std::pair<std::shared_ptr<Worker>, bool> GetWorker(const WorkerID &worker_id);

// GCS client ID for this node.
ClientID client_id_;
boost::asio::io_service &io_service_;
Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ namespace raylet {

/// A constructor responsible for initializing the state of a worker.
Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port,
rpc::ClientCallManager &client_call_manager)
rpc::ClientCallManager &client_call_manager, bool is_worker)
: worker_id_(worker_id),
pid_(pid),
port_(port),
language_(language),
blocked_(false),
num_missed_heartbeats_(0),
is_being_killed_(false),
client_call_manager_(client_call_manager) {
client_call_manager_(client_call_manager),
is_worker_(is_worker) {
if (port_ > 0) {
rpc_client_ = std::unique_ptr<rpc::WorkerTaskClient>(
new rpc::WorkerTaskClient("127.0.0.1", port_, client_call_manager_));
Expand All @@ -29,6 +30,8 @@ void Worker::MarkAsBeingKilled() { is_being_killed_ = true; }

bool Worker::IsBeingKilled() const { return is_being_killed_; }

bool Worker::IsWorker() const { return is_worker_; }

void Worker::MarkBlocked() { blocked_ = true; }

void Worker::MarkUnblocked() { blocked_ = false; }
Expand Down
5 changes: 4 additions & 1 deletion src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class Worker {
public:
/// A constructor that initializes a worker object.
Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port,
rpc::ClientCallManager &client_call_manager);
rpc::ClientCallManager &client_call_manager, bool is_worker = true);
/// A destructor responsible for freeing all worker state.
~Worker() {}
void MarkAsBeingKilled();
bool IsBeingKilled() const;
bool IsWorker() const;
void MarkBlocked();
void MarkUnblocked();
bool IsBlocked() const;
Expand Down Expand Up @@ -105,6 +106,8 @@ class Worker {
/// The `ClientCallManager` object that is shared by `WorkerTaskClient` from all
/// workers.
rpc::ClientCallManager &client_call_manager_;
/// Indicates whether this is a worker or a driver.
bool is_worker_;
/// The rpc client to send tasks to this worker.
std::unique_ptr<rpc::WorkerTaskClient> rpc_client_;
/// Reply of the `GetTask` request.
Expand Down
11 changes: 11 additions & 0 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,17 @@ void WorkerPool::TickHeartbeatTimer(int max_missed_heartbeats,
}
}

std::shared_ptr<Worker> WorkerPool::GetWorker(const WorkerID &worker_id) {
auto worker = GetRegisteredWorker(worker_id);
if (!worker) {
worker = GetRegisteredDriver(worker_id);
if (!worker) {
return nullptr;
}
}
return worker;
}

} // namespace raylet

} // namespace ray
5 changes: 5 additions & 0 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class WorkerPool {
void TickHeartbeatTimer(int max_missed_heartbeats,
std::vector<std::shared_ptr<Worker>> *dead_workers);

/// Return the pointer to the worker according to the worker id.
///
/// \param worker_id The worker id.
std::shared_ptr<Worker> GetWorker(const WorkerID &worker_id);

protected:
/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
Expand Down