-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert raylet to worker GRPC communication back to asio #5450
Revert raylet to worker GRPC communication back to asio #5450
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once all tests pass
@@ -39,9 +39,9 @@ | |||
private long client = 0; | |||
|
|||
// TODO(qwang): JobId parameter can be removed once we embed jobId in driverId. | |||
public RayletClientImpl(String schedulerSockName, UniqueId workerId, | |||
public RayletClientImpl(String schedulerSockName, UniqueId clientId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should revert the clientId
->workerId
change unless this causes problems
python/ray/__init__.py
Outdated
# initialization of grpc if we import pyarrow at first. | ||
# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more | ||
# details. | ||
import ray._raylet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should leave this in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I prefer not to. It should be fixed properly, once grpc is brought back into the client. This kind of stuff is just a ticking time bomb.
src/ray/raylet/main.cc
Outdated
@@ -171,7 +171,6 @@ int main(int argc, char *argv[]) { | |||
server.reset(); | |||
gcs_client->Disconnect(); | |||
main_service.stop(); | |||
RAY_LOG(INFO) << "Raylet server received SIGTERM message, shutting down..."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't revert
BUILD.bazel
Outdated
@@ -102,7 +102,7 @@ cc_proto_library( | |||
|
|||
# === Begin of rpc definitions === | |||
|
|||
# gRPC common lib. | |||
# GRPC common lib. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't revert
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Thanks for doing this. It's probably very difficult to preserve all the grpc changes for feature-flag, but sounds like it's simple to preserve the worker-to-raylet proto and rpc client/server files, so that it would be easier to integrate grpc back for worker-to-raylet communication once the ordering issue is resolved. |
I don't think we should keep those proto definitions/implementations in master if they aren't being used. It will be just as easy to add them later from the old commits as they're standalone files and we should generally prevent dead code as much as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid that it's not a good idea to simply revert the whole PR. Because:
- it'd be very hard to test and verify if gRPC can work in the future. We are likely to have to keep both asio and grpc in the code base for long term.
- Besides changing the communication lib, [gRPC] Migrate raylet client implementation to grpc #5120 also includes some nice optimizations. e.g., consolidating duplicated message definitions, changing worker lookup to use
worker_id
, fixing theSWAP
queue hack, etc (see my in-line comments).
I'm still concerned about doing the revert could waste a lot of time. Fixing the issue should be much easier. For example,
- Make
FetchOrReconstruct
sync (FetchOrReconstruct
is only called when the worked is blocked on waiting objects, so making it sync won't actually hurt perf). - Make the requests idempotent as @ericl suggested (but I don't fully understand this solution yet. @ericl could you please elaborate? thanks)
- Use [WIP] Handle request order for gRPC #5453.
Any of these 3 fixes looks easier than reverting.
However, if you insist on reverting, could you please only change the underlying communication lib, and keep other things (e.g., message definitions in protobuf)? Thanks in advance.
@@ -65,20 +65,6 @@ cc_proto_library( | |||
deps = [":object_manager_proto"], | |||
) | |||
|
|||
proto_library( | |||
name = "raylet_proto", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we still define the messages in protobuf
(but use asio for communication)? Because some message types are shared by other modules as well. And we can get rid of flatbuffers
dependency as well.
repeated ResourceIdSetInfo resource_ids = 2; | ||
// TODO(zhijunfu): `resource_ids` is represented as | ||
// flatbutters-serialized bytes, will be moved to protobuf later. | ||
bytes resource_ids = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should keep ResourceIdSetInfo
definition in protobuf
|
||
if (is_worker) { | ||
void NodeManager::ProcessRegisterClientRequestMessage( | ||
const std::shared_ptr<LocalClientConnection> &client, const uint8_t *message_data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please keep the code structure, e.g. HandleFooRequest
? Because it'd be easier to test and bring back grpc. You can just parse the requests from asio and dispatch them to these functions.
if (use_push_task) { | ||
// only call `HandleWorkerAvailable` when push mode is used. | ||
HandleWorkerAvailable(worker_id); | ||
HandleWorkerAvailable(connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this optimization is still useful. Using connection
as key, looking up a worker takes O(n) complexity. While using worker_id
, it's O(1).
auto finish_assign_task_callback = [this, worker, task_id](Status status) { | ||
if (worker->UsePush()) { | ||
// NOTE: we cannot directly call `FinishAssignTask` here because | ||
// it assumes the task is in SWAP queue, thus we need to delay invoking this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This SWAP
queue was actually a hack for asio that had been existing for long time in the code base. #5120 fixed this hack. It's unfortunate that we're bringing it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not actually true that the SWAP
queue is needed for asio. It was just one solution for when communication with the worker is asynchronous, which is true in both asio and grpc.
I do agree that the pattern in #5120 was an improvement over this. We could try and keep that here, by just moving tasks directly to the RUNNING
queue in DispatchTasks
and calling the code in FinishAssignTask
directly here (vs calling it in a callback).
@@ -0,0 +1,392 @@ | |||
#include "raylet_client.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be better to keep RayletClient
in the rpc
directory. Because
- In concept,
RayletClient
is used forrpc
. raylet_client
can be an independent lib. So worker can only depend on it, instead of the wholeraylet_lib
.
/// The `ClientCallManager` object that is shared by `WorkerTaskClient` from all | ||
/// workers. | ||
rpc::ClientCallManager &client_call_manager_; | ||
/// Indicates whether this is a worker or a driver. | ||
bool is_worker_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still useful
} | ||
|
||
std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(const WorkerID &worker_id) const { | ||
std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use worker_id
as the key for looking up workers
We concluded we should do a full revert to make a timely release possible and will bring back the changes incrementally. We will fix gRPC and do more testing to make sure it respects all the invariants of worker<->raylet communication (and relax these as we simplify the raylet).
Test FAILed. |
@pcmoritz Please explain what the issue with the Java test is and what the workaround is. It's not at all clear from https://github.com/ray-project/ray/pull/5450/files#diff-4eba54416056dabdaeb6d540848b3c62R31. |
According to Hao: I'm getting the following error on my machine with your PR. I seems that it's because sometimes raylet won't gracefully exit and thus won't clean up the socket file (Java use a fixed socket file for all tests /tmp/ray/sockets/raylet)
|
I can't unfortunately reproduce it. The workaround is there to make sure the Java tests are not broken. It should be properly fixed after the release. |
fbb.Finish(wait_reply); | ||
|
||
auto status = | ||
client->WriteMessage(static_cast<int64_t>(protocol::MessageType::WaitReply), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it wasn't like this before, but while we're at it, do you want to make this a WriteMessageAsync
?
auto finish_assign_task_callback = [this, worker, task_id](Status status) { | ||
if (worker->UsePush()) { | ||
// NOTE: we cannot directly call `FinishAssignTask` here because | ||
// it assumes the task is in SWAP queue, thus we need to delay invoking this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not actually true that the SWAP
queue is needed for asio. It was just one solution for when communication with the worker is asynchronous, which is true in both asio and grpc.
I do agree that the pattern in #5120 was an improvement over this. We could try and keep that here, by just moving tasks directly to the RUNNING
queue in DispatchTasks
and calling the code in FinishAssignTask
directly here (vs calling it in a callback).
I agree these are good changes, I think we can bring them back together with the structure of the gRPC handlers (which will also require to convert the serialization back to protobuf). It's a bit more work, let's do it in a follow up PR. |
I just merged #5370. There're some small conflicts that should be easy to fix. That PR added return status for |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
What do these changes do?
This reverts #5120 for the release, which caused problems with message reordering (#5411), raylet-worker heartbeats (#5343 and #5120), and other problems we tried to address #5341, #5296, #5313
Ideally, we would keep protobuf for communicating between workers and raylets, but it is better to reset to a known working state and bring the changes back incrementally.
It deploys a workaround for the Java tests (https://github.com/ray-project/ray/pull/5450/files#diff-4eba54416056dabdaeb6d540848b3c62R31), which we can hopefully remove later. The reason was described by @raulchen:
Related issue number
Linter
scripts/format.sh
to lint the changes in this PR.