Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Batch PinObjectIDs requests from Raylet client #24322

Merged
merged 11 commits into from
May 3, 2022

Conversation

mwtian
Copy link
Member

@mwtian mwtian commented Apr 29, 2022

Why are these changes needed?

During investigations for #24176, it is found that the majority of memory used by Raylet and core workers are due to gRPC client (core worker) and server (raylet) data structures for inflight PinObjectIDs RPCs. Instead of buffering the requests in gRPC, this PR changes to buffer ObjectIDs that need to be pinned inside RayletClient instead. This shows significant reduction in raylet's memory usage outside of object stores.

Also made minor cleanups in Raylet client:

  • Move aborting object creation error from ObjectBufferPool::AbortCreate() to callsites, with hopefully more accurate reasons.
  • C++ style cleanups.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@mwtian mwtian force-pushed the raylet-cleanup branch 2 times, most recently from 896b2f4 to 869d0c1 Compare April 29, 2022 17:44
@mwtian mwtian changed the title [WIP] Batch PinObjectID requests from Raylet client [Core] Batch PinObjectID requests from Raylet client Apr 29, 2022
@mwtian mwtian changed the title [Core] Batch PinObjectID requests from Raylet client [Core] Batch PinObjectIDs requests from Raylet client Apr 29, 2022
@mwtian mwtian marked this pull request as ready for review April 29, 2022 18:12
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

We also need to consider cases now where one of the objects in the request batch fails (because it was evicted) but the others succeed. Ideally, we should not fail all of the requests if this happens, but I think the current code will do that since we only ever had one ObjectID per request. Can you restructure the server-side handler and reply protobuf to return error codes per ObjectID?

Also, were you able to confirm that this fixes the previous OOM issue? Or if not, at least that the raylet heap memory usage is now stable? Are there any tests we can add other than the large-scale shuffle, maybe a single-node stress test with lots of tiny objects?

// request is inflight.
bool Flush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(pin_batcher_->mu_);

PinBatcher *const pin_batcher_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to avoid using raw pointers. I think we can potentially make this code structure nicer if we avoid giving the internal RayletDestination a pointer to pin_batcher_. Instead of having the PinBatcher::RayletDestination::Flush() method, we can do PinBatcher::Flush(std::string raylet_id).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this part of the code updated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed.

};
grpc_client_->PinObjectIDs(request, rpc_callback);
void RayletClient::PinObjectIDs(const rpc::Address &caller_address,
std::vector<ObjectID> object_ids,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this copy the object_ids? I think the previous version was fine, unless you saw a noticeable memory difference from it. Alternatively, you can add the object_ids directly to a buffered protobuf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callsite was creating temporary vectors for ObjectIDs, so it would just move them into the internal buffer instead of copying them. However since we are only calling this with singular ObjectID, changed the signature to const ObjectID &.

std::vector<Request> inflight;
{
absl::MutexLock lock(&pin_batcher_->mu_);
inflight = std::move(inflight_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also do inflight_.clear() for safety?


protected:
RayletClient() {}
std::unique_ptr<PinBatcher> pin_batcher_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc?

: pin_batcher_(batcher), raylet_address_(address) {}

// Tries sending out a request, if there are buffered messages but no
// request is inflight.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the return value.

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 29, 2022
@mwtian
Copy link
Member Author

mwtian commented Apr 29, 2022

Looking into updating and testing the server side logic. For the command below,

RAY_max_direct_call_object_size=128 TEST_OUTPUT_JSON=/tmp/release_test_out.json python ../release/nightly_tests/dataset/sort.py --num-partitions=2700 --partition-size=1e7 --shuffle

I was able to increase --num-partitions from OOM at=1600 to passing at 2700 on a m5.8xlarge (128GB memory). I'm also planning to add a single node stress test! I'm thinking about adding the test after this PR to prevent regression.

@stephanie-wang
Copy link
Contributor

I was able to increase --num-partitions from OOM at=1600 to passing at 2700 on a m5.8xlarge (128GB memory). I'm also planning to add a single node stress test! I'm thinking about adding the test after this PR to prevent regression.

Ah great! To clarify, does that mean it still fails due to OOM past 2700?

Hmm yeah if we can't come up with a test that can run easily in CI then we can do it afterwards. Maybe we should add it to to the microbenchmark release test suite.

@stephanie-wang
Copy link
Contributor

Another thing you can check is if this allows the new version of the datasets shuffle test to pass. Looks like it's failing from OOM here.

@mwtian
Copy link
Member Author

mwtian commented Apr 29, 2022

Yes, with this PR single node with 128GB memory still OOMs at 2800 partitions or above,. I suspect optimizations in Python worker are possible to increase the OOM threshold further or avoid OOMs.

For dataset_shuffle_random_shuffle_1tb_small_instances, it flakes with this change (1 pass, 1 failure), same as right now. There were no longer head node raylet OOMs however, only work node raylet OOMs. This could be addressed with Python worker optimizations as well.

@stephanie-wang
Copy link
Contributor

Got it, thanks! To make sure I understand, we still get raylet OOMs now, but you think that's mainly because Python workers are using too much heap memory? If that's the case, I think we can also manage for now by using smaller partitions.

@mwtian
Copy link
Member Author

mwtian commented Apr 30, 2022

Got it, thanks! To make sure I understand, we still get raylet OOMs now, but you think that's mainly because Python workers are using too much heap memory? If that's the case, I think we can also manage for now by using smaller partitions.

Right, from night testly logs, Python workers use 4GB~6GB of memory irrespective of their work, e.g. even for idle workers. Here are logs for OOM kills on a m5.2xlarge worker node during dataset_shuffle_random_shuffle_1tb_small_instances, where only Python workers are killed. In this case raylet is only using a few hundreds MB of memory, but each Python worker is using much more. I believe we also observed usually high Python worker memory usages in #24176 (comment).

worker_oom.txt

When raylet is killed, we don't have logs for it unfortunately. We have to see after fixing the Python worker memory usages.

@mwtian
Copy link
Member Author

mwtian commented Apr 30, 2022

Here are the OOM kill logs for the worker node where raylet is killed during the reduce stage: raylet_reduce_oom.txt. They look similar to the logs from https://gist.github.com/stephanie-wang/a3cefd0c11327760e097618386cdcd54.

It seems non-shared memory usage of the raylet jumped in the reduce stage (total raylet rss jumped from ~8GB to ~16GB, while shared memory of the node stayed around 8~9GB). Trying to profile the issue.

Copy link
Member Author

@mwtian mwtian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read through the code paths, in particular NodeManager::GetObjectsFromPlasma() and PlasmaClient::Impl::GetBuffers(). It seems pinning objects that are not in plasma is already not an error:

if (plasma_result.data == nullptr) {
results->push_back(nullptr);

For the other cases of not returning ok status from these functions, it seems the errors are either connection issues or indicating failures in the plasma store / raylet, and not associated with particular ObjectIDs. We probably don't need to change the other error handling logic. Thought?

@mwtian mwtian added tests-ok The tagger certifies test failures are unrelated and assumes personal liability. and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Apr 30, 2022
@mwtian mwtian requested a review from stephanie-wang April 30, 2022 23:42
@stephanie-wang
Copy link
Contributor

I read through the code paths, in particular NodeManager::GetObjectsFromPlasma() and PlasmaClient::Impl::GetBuffers(). It seems pinning objects that are not in plasma is already not an error:

if (plasma_result.data == nullptr) {
results->push_back(nullptr);

For the other cases of not returning ok status from these functions, it seems the errors are either connection issues or indicating failures in the plasma store / raylet, and not associated with particular ObjectIDs. We probably don't need to change the other error handling logic. Thought?

Ah got it, that sounds right. Thanks for checking this!

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are also some extra changes in gcs_client and object_manager that probably weren't meant to be included?

@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface {
MOCK_METHOD(void,
PinObjectIDs,
(const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback),
const ObjectID &object_ids,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const ObjectID &object_ids,
const ObjectID &object_id,

@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface {
MOCK_METHOD(void,
PinObjectIDs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the interface to PinObjectID?

Copy link
Member Author

@mwtian mwtian May 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, but I feel it would be a bit inconsistent with the PinObjectIDs{Request|Reply} message names. Maybe we can leave them as they are for now, or should we rename the RPC interface as well? We don't necessarily need to use plural form in the RPC name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's just leave them as they are for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we probably don't have to make their name exactly the same since they are different: one is pinning a single object and the rpc one is pinning multiple objects so PinObjectID and PinObjectIDs{Request|Reply} reflect that.

@@ -459,7 +459,7 @@ class NodeResourceInfoAccessor {
/// server.
virtual void AsyncResubscribe();

/// Report resource usage of a node to GCS asynchronously.
/// Report resource usage of a node to GCS asynchronously. Only used in tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cleanup which I think should be convenient to include here. Added to the PR description.

@@ -609,6 +609,8 @@ bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id,
// have to check again here because the pull manager runs in a different
// thread and the object may have been deactivated right before creating
// the chunk.
RAY_LOG(INFO) << "Aborting object creation because it is no longer actively pulled: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to clarify the aborting object creation error message, by moving it from ObjectBufferPool::AbortCreate() to the respective callsites with hopefully more accurate description of the reason for aborting. LMK if this can be worded differently!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it. I think it's okay for now, but in the future we should actually be pretty careful about moving INFO messages around since they can potentially suppress or trigger spurious messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I reverted the info log changes for now.

// request is inflight.
bool Flush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(pin_batcher_->mu_);

PinBatcher *const pin_batcher_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm is this part of the code updated?

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 2, 2022
@mwtian mwtian removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 2, 2022
@mwtian mwtian requested a review from stephanie-wang May 2, 2022 17:38
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find and fix! Please just remove the new INFO message, and then we can merge.

@@ -343,6 +343,8 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(int64_t num_bytes_available)

// Call the cancellation callbacks outside of the lock.
for (const auto &obj_id : object_ids_to_cancel) {
RAY_LOG(INFO) << "Not enough memory to create requested object " << obj_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this INFO message? I think this message will be spammy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed because the info log in AbortCreate() was removed. I reverted this change and may send out the info log change separately.

@@ -609,6 +609,8 @@ bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id,
// have to check again here because the pull manager runs in a different
// thread and the object may have been deactivated right before creating
// the chunk.
RAY_LOG(INFO) << "Aborting object creation because it is no longer actively pulled: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it. I think it's okay for now, but in the future we should actually be pretty careful about moving INFO messages around since they can potentially suppress or trigger spurious messages.

@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface {
MOCK_METHOD(void,
PinObjectIDs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's just leave them as they are for now.

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 2, 2022
@mwtian mwtian removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 2, 2022
@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface {
MOCK_METHOD(void,
PinObjectIDs,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we probably don't have to make their name exactly the same since they are different: one is pinning a single object and the rpc one is pinning multiple objects so PinObjectID and PinObjectIDs{Request|Reply} reflect that.

@mwtian
Copy link
Member Author

mwtian commented May 3, 2022

I feel we probably don't have to make their name exactly the same since they are different: one is pinning a single object and the rpc one is pinning multiple objects so PinObjectID and PinObjectIDs{Request|Reply} reflect that.

Yeah, but the function and the RPC is very closely related right now. void PinObjectID(const rpc::Address &caller_address, const ObjectID &object_id, rpc::ClientCallback<rpc::PinObjectIDsReply> callback) is a bit weird.

@stephanie-wang
Copy link
Contributor

I'll merge this for now, but we can always do a naming change later.

@stephanie-wang stephanie-wang merged commit 5a82640 into ray-project:master May 3, 2022
stephanie-wang pushed a commit that referenced this pull request May 4, 2022
As discussed in #24322, rename so the function name matches its signature for PinObjectID(). Also rename the RPC request/reply/method names, to keep them consistent.
scv119 added a commit to scv119/ray that referenced this pull request May 12, 2022
scv119 added a commit that referenced this pull request May 12, 2022
#24322)" and "[Core] rename `PinObjectIDs` to `PinObjectID` (#24451)" (#24741)

we noticed performance regression for nightly test shuffle_1tb_5000_partitions. concretely the test previously takes 1h10m to finish but now it takes more than 2h30minutes.

after investigation we believe mostly likely 5a82640 caused the regression.

here is the run before 5a82640: https://console.anyscale.com/o/anyscale-internal/projects/prj_SVFGM5yBqK6DHCfLtRMryXHM/clusters/ses_1ejykCYq9BnkC5v8ZJjrqc2b?command-history-section=command_history
here is the run after 5a82640:
https://console.anyscale.com/o/anyscale-internal/projects/prj_SVFGM5yBqK6DHCfLtRMryXHM/clusters/ses_Lr5N8jVRdHCWJWYA2SRaUkzZ?command-history-section=command_history
mwtian pushed a commit to mwtian/ray that referenced this pull request May 16, 2022
…et client (ray-project#24322)" and "[Core] rename `PinObjectIDs` to `PinObjectID` (ray-project#24451)" (ray-project#24741)"

This reverts commit 02042e1.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants