-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add common preprocessing for each request in node manager. #5296
Conversation
Test FAILed. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I left a few comments.
if (expiry_delay > expiry_delay_limit) { | ||
heartbeat_timer_.cancel(); | ||
Heartbeat(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check the timer in HandleForwardTaskRequest
as well. maybe move this to a CheckHeartbeatTimer
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically we should add the check for all the messages in raylet? node manager, object manager, and from worker to raylet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but except object manager requests, which are handled in background threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we also need to check the timer in HandleForwardTaskRequest
? The heartbeats are only for workers, not for remote node/object managers, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang because if raylet suddenly receives a huge number of ForwardTask
requests, raylet-to-gcs heartbeat will time out as well.
Also, can you explain that the main purpose of this PR is to solve the issue that |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Just a few small comments.
src/ray/raylet/node_manager.cc
Outdated
@@ -735,6 +739,83 @@ void NodeManager::DispatchTasks( | |||
local_queues_.MoveTasks(assigned_task_ids, TaskState::READY, TaskState::RUNNING); | |||
} | |||
|
|||
std::pair<std::shared_ptr<Worker>, bool> NodeManager::GetWorker( | |||
const WorkerID &worker_id) { | |||
bool is_worker = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of returning a pair here, I think it's clearer to put is_worker
in Worker
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Also, this should not be a method of NodeManager
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a pretty brittle and at-best temporary solution, but I guess it's okay for now. How feasible would it be to move the heartbeat handling to a separate thread? Is it necessary to use heartbeats for worker-raylet communication?
if (expiry_delay > expiry_delay_limit) { | ||
heartbeat_timer_.cancel(); | ||
Heartbeat(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we also need to check the timer in HandleForwardTaskRequest
? The heartbeats are only for workers, not for remote node/object managers, right?
src/ray/raylet/node_manager.cc
Outdated
@@ -735,6 +739,83 @@ void NodeManager::DispatchTasks( | |||
local_queues_.MoveTasks(assigned_task_ids, TaskState::READY, TaskState::RUNNING); | |||
} | |||
|
|||
std::pair<std::shared_ptr<Worker>, bool> NodeManager::GetWorker( | |||
const WorkerID &worker_id) { | |||
bool is_worker = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Also, this should not be a method of NodeManager
.
I also think moving the heartbeats to a different thread is the right solution here. We do need to make sure that it is not possible for the main thread to get stuck and the heartbeat thread to keep sending heartbeats however. |
@stephanie-wang @pcmoritz The problem with moving heartbeat to a different thread is that we need to use the gcs client to send heartbeat. However, the gcs client is bound to the io_service in the main thread. |
I think the actual problem is that asio doesn't prioritize timers over post handlers (AFAIK, some other event loops, e.g., Python's event loop, do.) Otherwise, it's totally fine to put timer in the event loop. |
@pcmoritz @stephanie-wang Since it's not easy to move heartbeat timer to another thread, let's use this PR to unblock the issue for now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. just one typo
src/ray/raylet/node_manager.h
Outdated
/// Preprocess request from raylet client. This takes care of common processing for each | ||
/// request, such as logging, checking heartbeat and whether worker is being killed. | ||
/// Preprocess request from raylet client. We will check whether the worker is being | ||
/// killed due to exitting of driver. Also we will send the heartbeat to monitor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo exiting
I wonder how much fixed overhead does this preprocessing task will create? Can we quickly check how much slower does it create? |
Test PASSed. |
Test PASSed. |
@simon-mo checking timer should be very cheap. my local test with your script shows it's even slightly faster than master. |
Test PASSed. |
After some discussion with @pcmoritz and @simon-mo, let's not merge this PR unless it is really necessary for stability even after #5341 has been merged. For raylet-to-redis heartbeats, the issue has been known for a while and the solution needs to be more comprehensive than this PR. For worker heartbeats, we think the long-term solution is to remove them completely, since it is possible for the raylet to detect worker failures without heartbeats. We're concerned that merging this code will encourage further short-term fixes when really we should concentrate on fixing the core issues. |
At first, this PR is only intended to print log and check whether the job is stopped, which are missing part of PR #5120. We found the heartbeat problem in some test and tried to solve it here. In fact, the root cause can be concluded as a misusing of GRPC. And I think #5341 is a better solution to solve the heartbeat problem. |
I think we can revert the heartbeat change, and merge other changes in this PR. |
@stephanie-wang heartbeat timer is reverted. I think this PR can be merged. Please let me know if you have other comments. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Test FAILed. |
Test FAILed. |
What do these changes do?
Related issue number
Linter
scripts/format.sh
to lint the changes in this PR.