Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[Core] Revert "[Core] Batch PinObjectIDs requests from Rayl… #24865

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/ray-contribute/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ of what the event stats look like:
CoreWorkerService.grpc_client.GetObjectLocationsOwner - 51333 total (0 active), CPU time: mean = 25.166 us, total = 1.292 s
ObjectManager.ObjectDeleted - 43188 total (0 active), CPU time: mean = 26.017 us, total = 1.124 s
CoreWorkerService.grpc_client.RemoveObjectLocationOwner - 43177 total (0 active), CPU time: mean = 2.368 us, total = 102.252 ms
NodeManagerService.grpc_server.PinObjectIDs - 40000 total (0 active), CPU time: mean = 194.860 us, total = 7.794 s
NodeManagerService.grpc_server.PinObjectID - 40000 total (0 active), CPU time: mean = 194.860 us, total = 7.794 s

Callback latency injection
--------------------------
Expand Down
54 changes: 54 additions & 0 deletions release/nightly_tests/wait_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import argparse
import time

import ray

ray.init(address="auto")

parser = argparse.ArgumentParser()
parser.add_argument(
"num_nodes", type=int, help="Wait for this number of nodes (includes head)"
)

parser.add_argument("max_time_s", type=int, help="Wait for this number of seconds")

parser.add_argument(
"--feedback_interval_s",
type=int,
default=10,
help="Wait for this number of seconds",
)

args = parser.parse_args()

curr_nodes = 0
start = time.time()
next_feedback = start
max_time = start + args.max_time_s

while not curr_nodes >= args.num_nodes:
now = time.time()

if now >= max_time:
raise RuntimeError(
f"Maximum wait time reached, but only "
f"{curr_nodes}/{args.num_nodes} nodes came up. Aborting."
)

if now >= next_feedback:
passed = now - start
print(
f"Waiting for more nodes to come up: "
f"{curr_nodes}/{args.num_nodes} "
f"({passed:.0f} seconds passed)"
)
next_feedback = now + args.feedback_interval_s

time.sleep(5)
curr_nodes = len(ray.nodes())

passed = time.time() - start
print(
f"Cluster is up: {curr_nodes}/{args.num_nodes} nodes online after "
f"{passed:.0f} seconds"
)
6 changes: 3 additions & 3 deletions src/mock/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ class MockNodeManager : public NodeManager {
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandlePinObjectIDs,
(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
HandlePinObjectID,
(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
Expand Down
12 changes: 6 additions & 6 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ namespace ray {
class MockPinObjectsInterface : public PinObjectsInterface {
public:
MOCK_METHOD(void,
PinObjectIDs,
PinObjectID,
(const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback),
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback),
(override));
};

Expand Down Expand Up @@ -189,10 +189,10 @@ class MockRayletClientInterface : public RayletClientInterface {
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback),
(override));
MOCK_METHOD(void,
PinObjectIDs,
PinObjectID,
(const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback),
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback),
(override));
MOCK_METHOD(void,
GetSystemConfig,
Expand Down
43 changes: 29 additions & 14 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
[this](const RayObject &object, const ObjectID &object_id) {
RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
},
/*flush_pin_requests_callback=*/
[this]() { local_raylet_client_->FlushPinObjectIDRequests(rpc_address_); },
/* retry_task_callback= */
[this](TaskSpecification &spec, bool delay) {
spec.GetMutableMessage().set_attempt_number(spec.AttemptNumber() + 1);
Expand Down Expand Up @@ -896,15 +898,21 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object,
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning put object " << object_id;
local_raylet_client_->PinObjectIDs(
local_raylet_client_->PinObjectID(
rpc_address_,
{object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
object_id,
[this, object_id](const Status &status, const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the object " << object_id
<< ". This object may get evicted while there are still "
"references to it: "
<< status;
}
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (!plasma_store_provider_->Release(object_id).ok()) {
if (auto s = plasma_store_provider_->Release(object_id); !s.ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma.";
<< "), might cause a leak in plasma: " << s;
}
});
} else {
Expand Down Expand Up @@ -1050,15 +1058,21 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning sealed object " << object_id;
local_raylet_client_->PinObjectIDs(
local_raylet_client_->PinObjectID(
owner_address != nullptr ? *owner_address : rpc_address_,
{object_id},
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
object_id,
[this, object_id](const Status &status, const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the object " << object_id
<< ". This object may get evicted while there are still "
"references to it: "
<< status;
}
// Only release the object once the raylet has responded to avoid the race
// condition that the object could be evicted before the raylet pins it.
if (!plasma_store_provider_->Release(object_id).ok()) {
if (auto s = plasma_store_provider_->Release(object_id); !s.ok()) {
RAY_LOG(ERROR) << "Failed to release ObjectID (" << object_id
<< "), might cause a leak in plasma.";
<< "), might cause a leak in plasma: " << s;
}
});
} else {
Expand Down Expand Up @@ -2440,16 +2454,17 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id,
// Asynchronously ask the raylet to pin the object. Note that this can fail
// if the raylet fails. We expect the owner of the object to handle that
// case (e.g., by detecting the raylet failure and storing an error).
local_raylet_client_->PinObjectIDs(
local_raylet_client_->PinObjectID(
owner_address,
{return_id},
return_id,
[return_id, pinned_return_object](const Status &status,
const rpc::PinObjectIDsReply &reply) {
const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the task return object "
<< return_id
<< ". This object may get evicted while there are still "
"references to it.";
"references to it: "
<< status;
}
});
return true;
Expand Down
36 changes: 18 additions & 18 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations) {
// If a copy still exists, pin the object by sending a
// PinObjectIDs RPC.
// PinObjectID RPC.
const auto node_id = NodeID::FromBinary(raylet_address.raylet_id());
RAY_LOG(DEBUG) << "Trying to pin copy of lost object " << object_id << " at node "
<< node_id;
Expand All @@ -118,23 +118,23 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
client = client_it->second;
}

client->PinObjectIDs(rpc_address_,
{object_id},
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDsReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object "
<< object_id << ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
});
client->PinObjectID(rpc_address_,
object_id,
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object "
<< object_id << ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
});
}

void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
}
}

flush_pin_requests_callback_();

TaskSpecification spec;
bool release_lineage = true;
int64_t min_lineage_bytes_to_evict = 0;
Expand Down
18 changes: 13 additions & 5 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
TaskManager(std::shared_ptr<CoreWorkerMemoryStore> in_memory_store,
std::shared_ptr<ReferenceCounter> reference_counter,
PutInLocalPlasmaCallback put_in_local_plasma_callback,
std::function<void()> flush_pin_requests_callback,
RetryTaskCallback retry_task_callback,
PushErrorCallback push_error_callback,
int64_t max_lineage_bytes)
: in_memory_store_(in_memory_store),
reference_counter_(reference_counter),
put_in_local_plasma_callback_(put_in_local_plasma_callback),
retry_task_callback_(retry_task_callback),
push_error_callback_(push_error_callback),
: in_memory_store_(std::move(in_memory_store)),
reference_counter_(std::move(reference_counter)),
put_in_local_plasma_callback_(std::move(put_in_local_plasma_callback)),
flush_pin_requests_callback_(std::move(flush_pin_requests_callback)),
retry_task_callback_(std::move(retry_task_callback)),
push_error_callback_(std::move(push_error_callback)),
max_lineage_bytes_(max_lineage_bytes) {
reference_counter_->SetReleaseLineageCallback(
[this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
Expand Down Expand Up @@ -365,6 +367,12 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// retrieve them.
const PutInLocalPlasmaCallback put_in_local_plasma_callback_;

/// Callback to flush buffered PinObjectID requests. Flushing is needed because
/// when Raylet is busy, single flight batching of PinObjectID requests can increase
/// end-to-end latency drastically. Flushing ensures the same latency with batching
/// enabled.
const std::function<void()> flush_pin_requests_callback_;

/// Called when a task should be retried.
const RetryTaskCallback retry_task_callback_;

Expand Down
15 changes: 7 additions & 8 deletions src/ray/core_worker/test/object_recovery_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,23 @@ class MockTaskResubmitter : public TaskResubmissionInterface {

class MockRayletClient : public PinObjectsInterface {
public:
void PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const rpc::ClientCallback<rpc::PinObjectIDsReply> &callback) override {
RAY_LOG(INFO) << "PinObjectIDs " << object_ids.size();
callbacks.push_back(callback);
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override {
RAY_LOG(INFO) << "PinObjectID " << object_id.Hex();
callbacks.push_back(std::move(callback));
}

size_t Flush() {
size_t flushed = callbacks.size();
for (const auto &callback : callbacks) {
callback(Status::OK(), rpc::PinObjectIDsReply());
callback(Status::OK(), rpc::PinObjectIDReply());
}
callbacks.clear();
return flushed;
}

std::list<rpc::ClientCallback<rpc::PinObjectIDsReply>> callbacks = {};
std::list<rpc::ClientCallback<rpc::PinObjectIDReply>> callbacks = {};
};

class MockObjectDirectory {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class NodeResourceInfoAccessor {
/// server.
virtual void AsyncResubscribe();

/// Report resource usage of a node to GCS asynchronously.
/// Report resource usage of a node to GCS asynchronously. Only used in tests.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
Expand Down
7 changes: 3 additions & 4 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,9 @@ struct GcsServerMocker {
}

/// PinObjectsInterface
void PinObjectIDs(
const rpc::Address &caller_address,
const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override {}
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override {}

/// DependencyWaiterInterface
ray::Status WaitForDirectActorCallArgs(
Expand Down
12 changes: 8 additions & 4 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ Status PlasmaClient::Impl::GetBuffers(

// If we get here, then the objects aren't all currently in use by this
// client, so we need to send a request to the plasma store.
RAY_RETURN_NOT_OK(SendGetRequest(
store_conn_, &object_ids[0], num_objects, timeout_ms, is_from_worker));
RAY_RETURN_NOT_OK(
SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms, is_from_worker));
std::vector<uint8_t> buffer;
RAY_RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaGetReply, &buffer));
std::vector<ObjectID> received_object_ids(num_objects);
Expand Down Expand Up @@ -560,8 +560,12 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID> &object_ids,
};
const size_t num_objects = object_ids.size();
*out = std::vector<ObjectBuffer>(num_objects);
return GetBuffers(
&object_ids[0], num_objects, timeout_ms, wrap_buffer, &(*out)[0], is_from_worker);
return GetBuffers(object_ids.data(),
num_objects,
timeout_ms,
wrap_buffer,
out->data(),
is_from_worker);
}

Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID &object_id) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/pull_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ class PullManager {
int64_t num_active_bundles_ = 0;

/// Callback to pin plasma objects.
std::function<std::unique_ptr<RayObject>(const ObjectID &object_ids)> pin_object_;
std::function<std::unique_ptr<RayObject>(const ObjectID &object_id)> pin_object_;

/// The last time OOM was reported. Track this so we don't spam warnings when
/// the object store is full.
Expand Down
6 changes: 3 additions & 3 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ message CancelWorkerLeaseReply {
bool success = 1;
}

message PinObjectIDsRequest {
message PinObjectIDRequest {
// Address of the owner to ask when to unpin the objects.
Address owner_address = 1;
// ObjectIDs to pin.
repeated bytes object_ids = 2;
}

message PinObjectIDsReply {}
message PinObjectIDReply {}

message GetNodeStatsRequest {
// Whether to include memory stats. This could be large since it includes
Expand Down Expand Up @@ -363,7 +363,7 @@ service NodeManagerService {
// lease request was not yet granted.
rpc CancelWorkerLease(CancelWorkerLeaseRequest) returns (CancelWorkerLeaseReply);
// Pin the provided object IDs.
rpc PinObjectIDs(PinObjectIDsRequest) returns (PinObjectIDsReply);
rpc PinObjectID(PinObjectIDRequest) returns (PinObjectIDReply);
// Get the current node stats.
rpc GetNodeStats(GetNodeStatsRequest) returns (GetNodeStatsReply);
// Trigger garbage collection in all workers across the cluster.
Expand Down
Loading