Skip to content

Commit

Permalink
[Core] rename PinObjectIDs to PinObjectID (ray-project#24451)
Browse files Browse the repository at this point in the history
As discussed in ray-project#24322, rename so the function name matches its signature for PinObjectID(). Also rename the RPC request/reply/method names, to keep them consistent.
  • Loading branch information
mwtian authored May 4, 2022
1 parent b79b834 commit e00c611
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 76 deletions.
2 changes: 1 addition & 1 deletion doc/source/ray-contribute/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ of what the event stats look like:
CoreWorkerService.grpc_client.GetObjectLocationsOwner - 51333 total (0 active), CPU time: mean = 25.166 us, total = 1.292 s
ObjectManager.ObjectDeleted - 43188 total (0 active), CPU time: mean = 26.017 us, total = 1.124 s
CoreWorkerService.grpc_client.RemoveObjectLocationOwner - 43177 total (0 active), CPU time: mean = 2.368 us, total = 102.252 ms
NodeManagerService.grpc_server.PinObjectIDs - 40000 total (0 active), CPU time: mean = 194.860 us, total = 7.794 s
NodeManagerService.grpc_server.PinObjectID - 40000 total (0 active), CPU time: mean = 194.860 us, total = 7.794 s
Callback latency injection
--------------------------
Expand Down
6 changes: 3 additions & 3 deletions src/mock/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ class MockNodeManager : public NodeManager {
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandlePinObjectIDs,
(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
HandlePinObjectID,
(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
Expand Down
8 changes: 4 additions & 4 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ namespace ray {
class MockPinObjectsInterface : public PinObjectsInterface {
public:
MOCK_METHOD(void,
PinObjectIDs,
PinObjectID,
(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback),
rpc::ClientCallback<rpc::PinObjectIDReply> callback),
(override));
};

Expand Down Expand Up @@ -188,10 +188,10 @@ class MockRayletClientInterface : public RayletClientInterface {
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback),
(override));
MOCK_METHOD(void,
PinObjectIDs,
PinObjectID,
(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback),
rpc::ClientCallback<rpc::PinObjectIDReply> callback),
(override));
MOCK_METHOD(void,
GetSystemConfig,
Expand Down
12 changes: 6 additions & 6 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,10 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object,
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning put object " << object_id;
local_raylet_client_->PinObjectIDs(
local_raylet_client_->PinObjectID(
rpc_address_,
object_id,
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
[this, object_id](const Status &status, const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the object " << object_id
<< ". This object may get evicted while there are still "
Expand Down Expand Up @@ -1065,10 +1065,10 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
if (pin_object) {
// Tell the raylet to pin the object **after** it is created.
RAY_LOG(DEBUG) << "Pinning sealed object " << object_id;
local_raylet_client_->PinObjectIDs(
local_raylet_client_->PinObjectID(
owner_address != nullptr ? *owner_address : rpc_address_,
object_id,
[this, object_id](const Status &status, const rpc::PinObjectIDsReply &reply) {
[this, object_id](const Status &status, const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the object " << object_id
<< ". This object may get evicted while there are still "
Expand Down Expand Up @@ -2465,11 +2465,11 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id,
// Asynchronously ask the raylet to pin the object. Note that this can fail
// if the raylet fails. We expect the owner of the object to handle that
// case (e.g., by detecting the raylet failure and storing an error).
local_raylet_client_->PinObjectIDs(
local_raylet_client_->PinObjectID(
owner_address,
return_id,
[return_id, pinned_return_object](const Status &status,
const rpc::PinObjectIDsReply &reply) {
const rpc::PinObjectIDReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the task return object "
<< return_id
Expand Down
36 changes: 18 additions & 18 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations) {
// If a copy still exists, pin the object by sending a
// PinObjectIDs RPC.
// PinObjectID RPC.
const auto node_id = NodeID::FromBinary(raylet_address.raylet_id());
RAY_LOG(DEBUG) << "Trying to pin copy of lost object " << object_id << " at node "
<< node_id;
Expand All @@ -118,23 +118,23 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
client = client_it->second;
}

client->PinObjectIDs(rpc_address_,
object_id,
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDsReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object "
<< object_id << ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
});
client->PinObjectID(rpc_address_,
object_id,
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDReply &reply) {
if (status.ok()) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_->UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO) << "Error pinning new copy of lost object "
<< object_id << ", trying again";
PinOrReconstructObject(object_id, other_locations);
}
});
}

void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
Expand Down
12 changes: 6 additions & 6 deletions src/ray/core_worker/test/object_recovery_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,23 @@ class MockTaskResubmitter : public TaskResubmissionInterface {

class MockRayletClient : public PinObjectsInterface {
public:
void PinObjectIDs(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback) override {
RAY_LOG(INFO) << "PinObjectIDs " << object_id.Hex();
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override {
RAY_LOG(INFO) << "PinObjectID " << object_id.Hex();
callbacks.push_back(std::move(callback));
}

size_t Flush() {
size_t flushed = callbacks.size();
for (const auto &callback : callbacks) {
callback(Status::OK(), rpc::PinObjectIDsReply());
callback(Status::OK(), rpc::PinObjectIDReply());
}
callbacks.clear();
return flushed;
}

std::list<rpc::ClientCallback<rpc::PinObjectIDsReply>> callbacks = {};
std::list<rpc::ClientCallback<rpc::PinObjectIDReply>> callbacks = {};
};

class MockObjectDirectory {
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ struct GcsServerMocker {
}

/// PinObjectsInterface
void PinObjectIDs(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback) override {}
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override {}

/// DependencyWaiterInterface
ray::Status WaitForDirectActorCallArgs(
Expand Down
7 changes: 3 additions & 4 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,14 @@ message CancelWorkerLeaseReply {
bool success = 1;
}

message PinObjectIDsRequest {
message PinObjectIDRequest {
// Address of the owner to ask when to unpin the objects.
Address owner_address = 1;
// ObjectIDs to pin.
repeated bytes object_ids = 2;
}

message PinObjectIDsReply {
}
message PinObjectIDReply {}

message GetNodeStatsRequest {
// Whether to include memory stats. This could be large since it includes
Expand Down Expand Up @@ -354,7 +353,7 @@ service NodeManagerService {
// lease request was not yet granted.
rpc CancelWorkerLease(CancelWorkerLeaseRequest) returns (CancelWorkerLeaseReply);
// Pin the provided object IDs.
rpc PinObjectIDs(PinObjectIDsRequest) returns (PinObjectIDsReply);
rpc PinObjectID(PinObjectIDRequest) returns (PinObjectIDReply);
// Get the current node stats.
rpc GetNodeStats(GetNodeStatsRequest) returns (GetNodeStatsReply);
// Trigger garbage collection in all workers across the cluster.
Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,7 @@ Status NodeManager::GetObjectsFromPlasma(
// Pin the objects in plasma by getting them and holding a reference to
// the returned buffer.
// NOTE: the caller must ensure that the objects already exist in plasma before
// sending a PinObjectIDs request.
// sending a PinObjectID request.
std::vector<plasma::ObjectBuffer> plasma_results;
// TODO(swang): This `Get` has a timeout of 0, so the plasma store will not
// block when serving the request. However, if the plasma store is under
Expand All @@ -2315,9 +2315,9 @@ Status NodeManager::GetObjectsFromPlasma(
return Status::OK();
}

void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
void NodeManager::HandlePinObjectID(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::vector<ObjectID> object_ids;
object_ids.reserve(request.object_ids_size());
const auto &owner_address = request.owner_address();
Expand Down
8 changes: 4 additions & 4 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `PinObjectIDs` request.
void HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
rpc::PinObjectIDsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `PinObjectID` request.
void HandlePinObjectID(const rpc::PinObjectIDRequest &request,
rpc::PinObjectIDReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle a `NodeStats` request.
void HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
Expand Down
14 changes: 7 additions & 7 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,9 @@ void RayletClient::ReleaseUnusedBundles(
});
}

void RayletClient::PinObjectIDs(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback) {
void RayletClient::PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) {
pin_batcher_->Add(caller_address, object_id, std::move(callback));
}

Expand Down Expand Up @@ -530,7 +530,7 @@ PinBatcher::PinBatcher(std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_c

void PinBatcher::Add(const rpc::Address &address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback) {
rpc::ClientCallback<rpc::PinObjectIDReply> callback) {
absl::MutexLock lock(&mu_);
total_inflight_pins_++;
RayletDestination &raylet =
Expand All @@ -552,13 +552,13 @@ bool PinBatcher::Flush(const std::string &raylet_id) {
raylet.inflight_ = std::move(raylet.buffered_);
raylet.buffered_.clear();

rpc::PinObjectIDsRequest request;
rpc::PinObjectIDRequest request;
request.mutable_owner_address()->CopyFrom(raylet.raylet_address_);
for (const auto &req : raylet.inflight_) {
request.add_object_ids(req.object_id.Binary());
}
auto rpc_callback = [this, raylet_id](Status status,
const rpc::PinObjectIDsReply &reply) {
const rpc::PinObjectIDReply &reply) {
std::vector<Request> inflight;
{
absl::MutexLock lock(&mu_);
Expand All @@ -575,7 +575,7 @@ bool PinBatcher::Flush(const std::string &raylet_id) {
req.callback(status, reply);
}
};
grpc_client_->PinObjectIDs(request, std::move(rpc_callback));
grpc_client_->PinObjectID(request, std::move(rpc_callback));

return true;
}
Expand Down
22 changes: 11 additions & 11 deletions src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ namespace ray {
class PinObjectsInterface {
public:
/// Request to a raylet to pin a plasma object. The callback will be sent via gRPC.
virtual void PinObjectIDs(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback) = 0;
virtual void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) = 0;

virtual ~PinObjectsInterface(){};
};
Expand Down Expand Up @@ -230,7 +230,7 @@ class RayletConnection {
std::mutex write_mutex_;
};

/// Batches PinObjectIDsRequest so there would be only one outstanding
/// Batches PinObjectIDRequest so there would be only one outstanding
/// request per Raylet. This reduces the memory and CPU overhead when a
/// large number of objects need to be pinned.
class PinBatcher {
Expand All @@ -240,19 +240,19 @@ class PinBatcher {
/// Adds objects to be pinned at the address.
void Add(const rpc::Address &address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback);
rpc::ClientCallback<rpc::PinObjectIDReply> callback);

/// Total number of objects waiting to be pinned.
int64_t TotalPending() const;

private:
// Request from a single Add() call.
struct Request {
Request(ObjectID oid, rpc::ClientCallback<rpc::PinObjectIDsReply> cb)
Request(ObjectID oid, rpc::ClientCallback<rpc::PinObjectIDReply> cb)
: object_id(oid), callback(std::move(cb)) {}

ObjectID object_id;
rpc::ClientCallback<rpc::PinObjectIDsReply> callback;
rpc::ClientCallback<rpc::PinObjectIDReply> callback;
};

// Collects buffered pin object requests intended for a raylet.
Expand Down Expand Up @@ -484,9 +484,9 @@ class RayletClient : public RayletClientInterface {
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) override;

void PinObjectIDs(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDsReply> callback) override;
void PinObjectID(const rpc::Address &caller_address,
const ObjectID &object_id,
rpc::ClientCallback<rpc::PinObjectIDReply> callback) override;

void ShutdownRaylet(
const NodeID &node_id,
Expand Down Expand Up @@ -535,7 +535,7 @@ class RayletClient : public RayletClientInterface {
ResourceMappingType resource_ids_;
/// The connection to the raylet server.
std::unique_ptr<RayletConnection> conn_;
/// Batches pin object ID requests to the same raylet. All PinObjectIDs requests
/// Batches pin object ID requests to the same raylet. All PinObjectID requests
/// should go through this.
std::unique_ptr<PinBatcher> pin_batcher_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/ray/rpc/node_manager/node_manager_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class NodeManagerWorkerClient

/// Notify the raylet to pin the provided object IDs.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
PinObjectIDs,
PinObjectID,
grpc_client_,
/*method_timeout_ms*/ -1, )

Expand Down
8 changes: 4 additions & 4 deletions src/ray/rpc/node_manager/node_manager_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedWorkers, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, CancelWorkerLease, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectID, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, FormatGlobalMemoryInfo, -1) \
Expand Down Expand Up @@ -114,9 +114,9 @@ class NodeManagerServiceHandler {
rpc::CancelResourceReserveReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;

virtual void HandlePinObjectIDs(const PinObjectIDsRequest &request,
PinObjectIDsReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandlePinObjectID(const PinObjectIDRequest &request,
PinObjectIDReply *reply,
SendReplyCallback send_reply_callback) = 0;

virtual void HandleGetNodeStats(const GetNodeStatsRequest &request,
GetNodeStatsReply *reply,
Expand Down

0 comments on commit e00c611

Please sign in to comment.