Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert raylet to worker GRPC communication back to asio #5450

Merged
merged 20 commits into from
Aug 18, 2019

Conversation

pcmoritz
Copy link
Contributor

@pcmoritz pcmoritz commented Aug 13, 2019

What do these changes do?

This reverts #5120 for the release, which caused problems with message reordering (#5411), raylet-worker heartbeats (#5343 and #5120), and other problems we tried to address #5341, #5296, #5313

Ideally, we would keep protobuf for communicating between workers and raylets, but it is better to reset to a known working state and bring the changes back incrementally.

It deploys a workaround for the Java tests (https://github.com/ray-project/ray/pull/5450/files#diff-4eba54416056dabdaeb6d540848b3c62R31), which we can hopefully remove later. The reason was described by @raulchen:

I'm getting the following error on my machine with your PR. I seems that it's because sometimes raylet won't gracefully exit and thus won't clean up the socket file (Java use a fixed socket file for all tests /tmp/ray/sockets/raylet)

libc++abi.dylib: terminating with uncaught exception of type boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >: bind: Address already in use
*** Aborted at 1565873121 (unix time) try "date -d @1565873121" if you are using GNU date ***
PC: @                0x0 (unknown)
*** SIGABRT (@0x7fff5e05f2c6) received by PID 63811 (TID 0x111f255c0) stack trace: ***
    @     0x7fff5e10fb5d _sigtramp
    @     0x7fff5b1b2998 GCC_except_table51
    @     0x7fff5dfc96a6 abort
    @     0x7fff5b1a5641 abort_message
    @     0x7fff5b1a57c7 default_terminate_handler()
    @     0x7fff5c758eeb _objc_terminate()
    @     0x7fff5b1b119e std::__terminate()
    @     0x7fff5b1b0f86 __cxxabiv1::failed_throw()
    @     0x7fff5b1a3f99 __cxa_throw
    @        0x108704e7b boost::throw_exception<>()
    @        0x108704d94 boost::asio::detail::do_throw_error()
    @        0x108704d23 boost::asio::detail::throw_error()
    @        0x108823f57 boost::asio::basic_socket_acceptor<>::basic_socket_acceptor()
    @        0x108800234 boost::asio::basic_socket_acceptor<>::basic_socket_acceptor()
    @        0x1087ffac9 ray::raylet::Raylet::Raylet()
    @        0x108800e0f ray::raylet::Raylet::Raylet()
    @        0x1086f651e main
    @     0x7fff5df243d5 start

And the reason why the raylet won't exit gracefully is because

F0815 20:54:13.702641 275129792 node_manager.cc:418]  Check failed: client_id != gcs_client_->client_table().GetLocalClientId() Exiting because this node manager has mistakenly been marked dead by the monitor.

However, this error occurs randomly, not specific to any particular case. It seems that raylet would sometimes be blocked.

Related issue number

Linter

  • I've run scripts/format.sh to lint the changes in this PR.

Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once all tests pass

@@ -39,9 +39,9 @@
private long client = 0;

// TODO(qwang): JobId parameter can be removed once we embed jobId in driverId.
public RayletClientImpl(String schedulerSockName, UniqueId workerId,
public RayletClientImpl(String schedulerSockName, UniqueId clientId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should revert the clientId->workerId change unless this causes problems

# initialization of grpc if we import pyarrow at first.
# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more
# details.
import ray._raylet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should leave this in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I prefer not to. It should be fixed properly, once grpc is brought back into the client. This kind of stuff is just a ticking time bomb.

@@ -171,7 +171,6 @@ int main(int argc, char *argv[]) {
server.reset();
gcs_client->Disconnect();
main_service.stop();
RAY_LOG(INFO) << "Raylet server received SIGTERM message, shutting down...";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't revert

BUILD.bazel Outdated
@@ -102,7 +102,7 @@ cc_proto_library(

# === Begin of rpc definitions ===

# gRPC common lib.
# GRPC common lib.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't revert

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16269/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16272/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16277/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16283/
Test FAILed.

@zhijunfu zhijunfu requested review from zhijunfu and raulchen August 14, 2019 12:09
@zhijunfu
Copy link
Contributor

Thanks for doing this.

It's probably very difficult to preserve all the grpc changes for feature-flag, but sounds like it's simple to preserve the worker-to-raylet proto and rpc client/server files, so that it would be easier to integrate grpc back for worker-to-raylet communication once the ordering issue is resolved.

@edoakes
Copy link
Collaborator

edoakes commented Aug 14, 2019

I don't think we should keep those proto definitions/implementations in master if they aren't being used. It will be just as easy to add them later from the old commits as they're standalone files and we should generally prevent dead code as much as possible.

Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that it's not a good idea to simply revert the whole PR. Because:

  1. it'd be very hard to test and verify if gRPC can work in the future. We are likely to have to keep both asio and grpc in the code base for long term.
  2. Besides changing the communication lib, [gRPC] Migrate raylet client implementation to grpc #5120 also includes some nice optimizations. e.g., consolidating duplicated message definitions, changing worker lookup to use worker_id, fixing the SWAP queue hack, etc (see my in-line comments).

I'm still concerned about doing the revert could waste a lot of time. Fixing the issue should be much easier. For example,

  1. Make FetchOrReconstruct sync (FetchOrReconstruct is only called when the worked is blocked on waiting objects, so making it sync won't actually hurt perf).
  2. Make the requests idempotent as @ericl suggested (but I don't fully understand this solution yet. @ericl could you please elaborate? thanks)
  3. Use [WIP] Handle request order for gRPC #5453.

Any of these 3 fixes looks easier than reverting.

However, if you insist on reverting, could you please only change the underlying communication lib, and keep other things (e.g., message definitions in protobuf)? Thanks in advance.

@@ -65,20 +65,6 @@ cc_proto_library(
deps = [":object_manager_proto"],
)

proto_library(
name = "raylet_proto",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still define the messages in protobuf (but use asio for communication)? Because some message types are shared by other modules as well. And we can get rid of flatbuffers dependency as well.

repeated ResourceIdSetInfo resource_ids = 2;
// TODO(zhijunfu): `resource_ids` is represented as
// flatbutters-serialized bytes, will be moved to protobuf later.
bytes resource_ids = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should keep ResourceIdSetInfo definition in protobuf


if (is_worker) {
void NodeManager::ProcessRegisterClientRequestMessage(
const std::shared_ptr<LocalClientConnection> &client, const uint8_t *message_data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please keep the code structure, e.g. HandleFooRequest? Because it'd be easier to test and bring back grpc. You can just parse the requests from asio and dispatch them to these functions.

if (use_push_task) {
// only call `HandleWorkerAvailable` when push mode is used.
HandleWorkerAvailable(worker_id);
HandleWorkerAvailable(connection);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this optimization is still useful. Using connection as key, looking up a worker takes O(n) complexity. While using worker_id, it's O(1).

auto finish_assign_task_callback = [this, worker, task_id](Status status) {
if (worker->UsePush()) {
// NOTE: we cannot directly call `FinishAssignTask` here because
// it assumes the task is in SWAP queue, thus we need to delay invoking this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SWAP queue was actually a hack for asio that had been existing for long time in the code base. #5120 fixed this hack. It's unfortunate that we're bringing it back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not actually true that the SWAP queue is needed for asio. It was just one solution for when communication with the worker is asynchronous, which is true in both asio and grpc.

I do agree that the pattern in #5120 was an improvement over this. We could try and keep that here, by just moving tasks directly to the RUNNING queue in DispatchTasks and calling the code in FinishAssignTask directly here (vs calling it in a callback).

@@ -0,0 +1,392 @@
#include "raylet_client.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be better to keep RayletClient in the rpc directory. Because

  1. In concept, RayletClient is used for rpc.
  2. raylet_client can be an independent lib. So worker can only depend on it, instead of the whole raylet_lib.

/// The `ClientCallManager` object that is shared by `WorkerTaskClient` from all
/// workers.
rpc::ClientCallManager &client_call_manager_;
/// Indicates whether this is a worker or a driver.
bool is_worker_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still useful

}

std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(const WorkerID &worker_id) const {
std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use worker_id as the key for looking up workers

@pcmoritz pcmoritz dismissed raulchen’s stale review August 16, 2019 04:00

We concluded we should do a full revert to make a timely release possible and will bring back the changes incrementally. We will fix gRPC and do more testing to make sure it respects all the invariants of worker<->raylet communication (and relax these as we simplify the raylet).

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16315/
Test FAILed.

@robertnishihara
Copy link
Collaborator

@pcmoritz Please explain what the issue with the Java test is and what the workaround is. It's not at all clear from https://github.com/ray-project/ray/pull/5450/files#diff-4eba54416056dabdaeb6d540848b3c62R31.

@pcmoritz
Copy link
Contributor Author

pcmoritz commented Aug 16, 2019

According to Hao:

I'm getting the following error on my machine with your PR. I seems that it's because sometimes raylet won't gracefully exit and thus won't clean up the socket file (Java use a fixed socket file for all tests /tmp/ray/sockets/raylet)

libc++abi.dylib: terminating with uncaught exception of type boost::exception_detail::clone_impl<boost::exception_detail::error_info_injector<boost::system::system_error> >: bind: Address already in use
*** Aborted at 1565873121 (unix time) try "date -d @1565873121" if you are using GNU date ***
PC: @                0x0 (unknown)
*** SIGABRT (@0x7fff5e05f2c6) received by PID 63811 (TID 0x111f255c0) stack trace: ***
    @     0x7fff5e10fb5d _sigtramp
    @     0x7fff5b1b2998 GCC_except_table51
    @     0x7fff5dfc96a6 abort
    @     0x7fff5b1a5641 abort_message
    @     0x7fff5b1a57c7 default_terminate_handler()
    @     0x7fff5c758eeb _objc_terminate()
    @     0x7fff5b1b119e std::__terminate()
    @     0x7fff5b1b0f86 __cxxabiv1::failed_throw()
    @     0x7fff5b1a3f99 __cxa_throw
    @        0x108704e7b boost::throw_exception<>()
    @        0x108704d94 boost::asio::detail::do_throw_error()
    @        0x108704d23 boost::asio::detail::throw_error()
    @        0x108823f57 boost::asio::basic_socket_acceptor<>::basic_socket_acceptor()
    @        0x108800234 boost::asio::basic_socket_acceptor<>::basic_socket_acceptor()
    @        0x1087ffac9 ray::raylet::Raylet::Raylet()
    @        0x108800e0f ray::raylet::Raylet::Raylet()
    @        0x1086f651e main
    @     0x7fff5df243d5 start

@pcmoritz
Copy link
Contributor Author

I can't unfortunately reproduce it. The workaround is there to make sure the Java tests are not broken. It should be properly fixed after the release.

fbb.Finish(wait_reply);

auto status =
client->WriteMessage(static_cast<int64_t>(protocol::MessageType::WaitReply),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it wasn't like this before, but while we're at it, do you want to make this a WriteMessageAsync?

auto finish_assign_task_callback = [this, worker, task_id](Status status) {
if (worker->UsePush()) {
// NOTE: we cannot directly call `FinishAssignTask` here because
// it assumes the task is in SWAP queue, thus we need to delay invoking this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not actually true that the SWAP queue is needed for asio. It was just one solution for when communication with the worker is asynchronous, which is true in both asio and grpc.

I do agree that the pattern in #5120 was an improvement over this. We could try and keep that here, by just moving tasks directly to the RUNNING queue in DispatchTasks and calling the code in FinishAssignTask directly here (vs calling it in a callback).

@pcmoritz
Copy link
Contributor Author

I agree these are good changes, I think we can bring them back together with the structure of the gRPC handlers (which will also require to convert the serialization back to protobuf). It's a bit more work, let's do it in a follow up PR.

@raulchen
Copy link
Contributor

I just merged #5370. There're some small conflicts that should be easy to fix. That PR added return status for RegisterClient to indicate whether worker registration is successful. As asio doesn't have status, you can add a bool successful flag in RegisterClientReply.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16321/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16322/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16329/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16325/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16338/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16339/
Test FAILed.

@pcmoritz pcmoritz added the release-blocker P0 Issue that blocks the release label Aug 17, 2019
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16356/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16357/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16368/
Test PASSed.

@pcmoritz pcmoritz merged commit 599cc2b into ray-project:master Aug 18, 2019
@pcmoritz pcmoritz deleted the revert-raylet-worker-grpc branch August 18, 2019 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants