-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Batch PinObjectIDs
requests from Raylet client
#24322
Conversation
896b2f4
to
869d0c1
Compare
PinObjectID
requests from Raylet clientPinObjectID
requests from Raylet client
PinObjectID
requests from Raylet clientPinObjectIDs
requests from Raylet client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice find!
We also need to consider cases now where one of the objects in the request batch fails (because it was evicted) but the others succeed. Ideally, we should not fail all of the requests if this happens, but I think the current code will do that since we only ever had one ObjectID per request. Can you restructure the server-side handler and reply protobuf to return error codes per ObjectID?
Also, were you able to confirm that this fixes the previous OOM issue? Or if not, at least that the raylet heap memory usage is now stable? Are there any tests we can add other than the large-scale shuffle, maybe a single-node stress test with lots of tiny objects?
// request is inflight. | ||
bool Flush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(pin_batcher_->mu_); | ||
|
||
PinBatcher *const pin_batcher_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to avoid using raw pointers. I think we can potentially make this code structure nicer if we avoid giving the internal RayletDestination a pointer to pin_batcher_. Instead of having the PinBatcher::RayletDestination::Flush()
method, we can do PinBatcher::Flush(std::string raylet_id)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm is this part of the code updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed.
}; | ||
grpc_client_->PinObjectIDs(request, rpc_callback); | ||
void RayletClient::PinObjectIDs(const rpc::Address &caller_address, | ||
std::vector<ObjectID> object_ids, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this copy the object_ids
? I think the previous version was fine, unless you saw a noticeable memory difference from it. Alternatively, you can add the object_ids directly to a buffered protobuf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callsite was creating temporary vectors for ObjectIDs, so it would just move them into the internal buffer instead of copying them. However since we are only calling this with singular ObjectID, changed the signature to const ObjectID &
.
std::vector<Request> inflight; | ||
{ | ||
absl::MutexLock lock(&pin_batcher_->mu_); | ||
inflight = std::move(inflight_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also do inflight_.clear()
for safety?
|
||
protected: | ||
RayletClient() {} | ||
std::unique_ptr<PinBatcher> pin_batcher_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc?
: pin_batcher_(batcher), raylet_address_(address) {} | ||
|
||
// Tries sending out a request, if there are buffered messages but no | ||
// request is inflight. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the return value.
Looking into updating and testing the server side logic. For the command below,
I was able to increase |
Ah great! To clarify, does that mean it still fails due to OOM past 2700? Hmm yeah if we can't come up with a test that can run easily in CI then we can do it afterwards. Maybe we should add it to to the microbenchmark release test suite. |
Another thing you can check is if this allows the new version of the datasets shuffle test to pass. Looks like it's failing from OOM here. |
Yes, with this PR single node with 128GB memory still OOMs at 2800 partitions or above,. I suspect optimizations in Python worker are possible to increase the OOM threshold further or avoid OOMs. For |
Got it, thanks! To make sure I understand, we still get raylet OOMs now, but you think that's mainly because Python workers are using too much heap memory? If that's the case, I think we can also manage for now by using smaller partitions. |
Right, from night testly logs, Python workers use 4GB~6GB of memory irrespective of their work, e.g. even for idle workers. Here are logs for OOM kills on a When raylet is killed, we don't have logs for it unfortunately. We have to see after fixing the Python worker memory usages. |
Here are the OOM kill logs for the worker node where raylet is killed during the reduce stage: raylet_reduce_oom.txt. They look similar to the logs from https://gist.github.com/stephanie-wang/a3cefd0c11327760e097618386cdcd54. It seems non-shared memory usage of the raylet jumped in the reduce stage (total raylet rss jumped from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read through the code paths, in particular NodeManager::GetObjectsFromPlasma()
and PlasmaClient::Impl::GetBuffers()
. It seems pinning objects that are not in plasma is already not an error:
ray/src/ray/raylet/node_manager.cc
Lines 2310 to 2311 in 27917f5
if (plasma_result.data == nullptr) { | |
results->push_back(nullptr); |
For the other cases of not returning ok status from these functions, it seems the errors are either connection issues or indicating failures in the plasma store / raylet, and not associated with particular ObjectIDs. We probably don't need to change the other error handling logic. Thought?
Ah got it, that sounds right. Thanks for checking this! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are also some extra changes in gcs_client and object_manager that probably weren't meant to be included?
@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface { | |||
MOCK_METHOD(void, | |||
PinObjectIDs, | |||
(const rpc::Address &caller_address, | |||
const std::vector<ObjectID> &object_ids, | |||
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback), | |||
const ObjectID &object_ids, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ObjectID &object_ids, | |
const ObjectID &object_id, |
@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface { | |||
MOCK_METHOD(void, | |||
PinObjectIDs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename the interface to PinObjectID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, but I feel it would be a bit inconsistent with the PinObjectIDs{Request|Reply}
message names. Maybe we can leave them as they are for now, or should we rename the RPC interface as well? We don't necessarily need to use plural form in the RPC name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's just leave them as they are for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we probably don't have to make their name exactly the same since they are different: one is pinning a single object and the rpc one is pinning multiple objects so PinObjectID and PinObjectIDs{Request|Reply} reflect that.
@@ -459,7 +459,7 @@ class NodeResourceInfoAccessor { | |||
/// server. | |||
virtual void AsyncResubscribe(); | |||
|
|||
/// Report resource usage of a node to GCS asynchronously. | |||
/// Report resource usage of a node to GCS asynchronously. Only used in tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cleanup which I think should be convenient to include here. Added to the PR description.
@@ -609,6 +609,8 @@ bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id, | |||
// have to check again here because the pull manager runs in a different | |||
// thread and the object may have been deactivated right before creating | |||
// the chunk. | |||
RAY_LOG(INFO) << "Aborting object creation because it is no longer actively pulled: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to clarify the aborting object creation error message, by moving it from ObjectBufferPool::AbortCreate()
to the respective callsites with hopefully more accurate description of the reason for aborting. LMK if this can be worded differently!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it. I think it's okay for now, but in the future we should actually be pretty careful about moving INFO messages around since they can potentially suppress or trigger spurious messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I reverted the info log changes for now.
// request is inflight. | ||
bool Flush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(pin_batcher_->mu_); | ||
|
||
PinBatcher *const pin_batcher_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm is this part of the code updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice find and fix! Please just remove the new INFO message, and then we can merge.
@@ -343,6 +343,8 @@ void PullManager::UpdatePullsBasedOnAvailableMemory(int64_t num_bytes_available) | |||
|
|||
// Call the cancellation callbacks outside of the lock. | |||
for (const auto &obj_id : object_ids_to_cancel) { | |||
RAY_LOG(INFO) << "Not enough memory to create requested object " << obj_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove this INFO message? I think this message will be spammy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was needed because the info log in AbortCreate()
was removed. I reverted this change and may send out the info log change separately.
@@ -609,6 +609,8 @@ bool ObjectManager::ReceiveObjectChunk(const NodeID &node_id, | |||
// have to check again here because the pull manager runs in a different | |||
// thread and the object may have been deactivated right before creating | |||
// the chunk. | |||
RAY_LOG(INFO) << "Aborting object creation because it is no longer actively pulled: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it. I think it's okay for now, but in the future we should actually be pretty careful about moving INFO messages around since they can potentially suppress or trigger spurious messages.
@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface { | |||
MOCK_METHOD(void, | |||
PinObjectIDs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's just leave them as they are for now.
@@ -19,8 +19,8 @@ class MockPinObjectsInterface : public PinObjectsInterface { | |||
MOCK_METHOD(void, | |||
PinObjectIDs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we probably don't have to make their name exactly the same since they are different: one is pinning a single object and the rpc one is pinning multiple objects so PinObjectID and PinObjectIDs{Request|Reply} reflect that.
Yeah, but the function and the RPC is very closely related right now. |
I'll merge this for now, but we can always do a naming change later. |
As discussed in #24322, rename so the function name matches its signature for PinObjectID(). Also rename the RPC request/reply/method names, to keep them consistent.
…project#24322)" This reverts commit 5a82640.
#24322)" and "[Core] rename `PinObjectIDs` to `PinObjectID` (#24451)" (#24741) we noticed performance regression for nightly test shuffle_1tb_5000_partitions. concretely the test previously takes 1h10m to finish but now it takes more than 2h30minutes. after investigation we believe mostly likely 5a82640 caused the regression. here is the run before 5a82640: https://console.anyscale.com/o/anyscale-internal/projects/prj_SVFGM5yBqK6DHCfLtRMryXHM/clusters/ses_1ejykCYq9BnkC5v8ZJjrqc2b?command-history-section=command_history here is the run after 5a82640: https://console.anyscale.com/o/anyscale-internal/projects/prj_SVFGM5yBqK6DHCfLtRMryXHM/clusters/ses_Lr5N8jVRdHCWJWYA2SRaUkzZ?command-history-section=command_history
…et client (ray-project#24322)" and "[Core] rename `PinObjectIDs` to `PinObjectID` (ray-project#24451)" (ray-project#24741)" This reverts commit 02042e1.
Why are these changes needed?
During investigations for #24176, it is found that the majority of memory used by Raylet and core workers are due to gRPC client (core worker) and server (raylet) data structures for inflight
PinObjectIDs
RPCs. Instead of buffering the requests in gRPC, this PR changes to buffer ObjectIDs that need to be pinned insideRayletClient
instead. This shows significant reduction in raylet's memory usage outside of object stores.Also made minor cleanups in Raylet client:
ObjectBufferPool::AbortCreate()
to callsites, with hopefully more accurate reasons.Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.