Skip to content

Commit

Permalink
Revert "Inline small objects in GetObjectStatus response. (ray-projec…
Browse files Browse the repository at this point in the history
…t#13309)" (ray-project#13615)

This reverts commit a82fa80.
  • Loading branch information
amogkam authored Jan 22, 2021
1 parent 87ca102 commit 20acc3b
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 140 deletions.
7 changes: 3 additions & 4 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -898,17 +898,16 @@ cdef class CoreWorker:

return RayObjectsToDataMetadataPairs(results)

def object_exists(self, ObjectRef object_ref, memory_store_only=False):
def object_exists(self, ObjectRef object_ref):
cdef:
c_bool has_object
c_bool is_in_plasma
CObjectID c_object_id = object_ref.native()

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Contains(
c_object_id, &has_object, &is_in_plasma))
c_object_id, &has_object))

return has_object and (not memory_store_only or not is_in_plasma)
return has_object

cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectRef object_ref,
Expand Down
3 changes: 1 addition & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results,
c_bool plasma_objects_only)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object,
c_bool *is_in_plasma)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
CRayStatus Wait(const c_vector[CObjectID] &object_ids, int num_objects,
int64_t timeout_ms, c_vector[c_bool] *results,
c_bool fetch_local)
Expand Down
37 changes: 0 additions & 37 deletions python/ray/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,43 +521,6 @@ def method(self):
assert ray.worker.global_worker.core_worker.object_exists(x_id)


@pytest.mark.skipif(client_test_enabled(), reason="internal api")
def test_future_resolution_skip_plasma(ray_start_cluster):
cluster = ray_start_cluster
# Disable worker caching so worker leases are not reused; set object
# inlining size threshold and enable storing of small objects in in-memory
# object store so the borrowed ref is inlined.
cluster.add_node(
num_cpus=1,
resources={"pin_head": 1},
_system_config={
"worker_lease_timeout_milliseconds": 0,
"max_direct_call_object_size": 100 * 1024,
"put_small_object_in_memory_store": True,
},
)
cluster.add_node(num_cpus=1, resources={"pin_worker": 1})
ray.init(address=cluster.address)

@ray.remote(resources={"pin_head": 1})
def f(x):
return x + 1

@ray.remote(resources={"pin_worker": 1})
def g(x):
borrowed_ref = x[0]
f_ref = f.remote(borrowed_ref)
# borrowed_ref should be inlined on future resolution and shouldn't be
# in Plasma.
assert ray.worker.global_worker.core_worker.object_exists(
borrowed_ref, memory_store_only=True)
return ray.get(f_ref) * 2

one = ray.put(1)
g_ref = g.remote([one])
assert ray.get(g_ref) == 4


if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
48 changes: 13 additions & 35 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1058,18 +1058,14 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
return Status::OK();
}

Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object,
bool *is_in_plasma) {
Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) {
bool found = false;
bool in_plasma = false;
found = memory_store_->Contains(object_id, &in_plasma);
if (in_plasma) {
RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found));
}
*has_object = found;
if (is_in_plasma != nullptr) {
*is_in_plasma = found && in_plasma;
}
return Status::OK();
}

Expand Down Expand Up @@ -2095,43 +2091,25 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
RAY_CHECK(owner_address.worker_id() == request.owner_worker_id());
bool is_freed = reference_counter_->IsPlasmaObjectFreed(object_id);

if (reference_counter_->IsPlasmaObjectFreed(object_id)) {
reply->set_status(rpc::GetObjectStatusReply::FREED);
} else {
reply->set_status(rpc::GetObjectStatusReply::CREATED);
}
// Send the reply once the value has become available. The value is
// guaranteed to become available eventually because we own the object and
// its ref count is > 0.
memory_store_->GetAsync(object_id, [reply, send_reply_callback,
is_freed](std::shared_ptr<RayObject> obj) {
if (is_freed) {
reply->set_status(rpc::GetObjectStatusReply::FREED);
} else {
// If obj is the concrete object value, it is small, so we
// send the object back to the caller in the GetObjectStatus
// reply, bypassing a Plasma put and object transfer. If obj
// is an indicator that the object is in Plasma, we set an
// in_plasma indicator on the message, and the caller will
// have to facilitate a Plasma object transfer to get the
// object value.
auto *object = reply->mutable_object();
if (obj->HasData()) {
const auto &data = obj->GetData();
object->set_data(data->Data(), data->Size());
}
if (obj->HasMetadata()) {
const auto &metadata = obj->GetMetadata();
object->set_metadata(metadata->Data(), metadata->Size());
}
for (const auto &nested_id : obj->GetNestedIds()) {
object->add_nested_inlined_ids(nested_id.Binary());
}
reply->set_status(rpc::GetObjectStatusReply::CREATED);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
});
// TODO(swang): We could probably just send the object value if it is small
// enough and we have it local.
memory_store_->GetAsync(object_id,
[send_reply_callback](std::shared_ptr<RayObject> obj) {
send_reply_callback(Status::OK(), nullptr, nullptr);
});
}

RemoveLocalReference(object_id);
} // namespace ray
}

void CoreWorker::HandleWaitForActorOutOfScope(
const rpc::WaitForActorOutOfScopeRequest &request,
Expand Down
4 changes: 1 addition & 3 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
///
/// \param[in] object_id ID of the objects to check for.
/// \param[out] has_object Whether or not the object is present.
/// \param[out] is_in_plasma Whether or not the object is in Plasma.
/// \return Status.
Status Contains(const ObjectID &object_id, bool *has_object,
bool *is_in_plasma = nullptr);
Status Contains(const ObjectID &object_id, bool *has_object);

/// Wait for a list of objects to appear in the object store.
/// Duplicate object ids are supported, and `num_objects` includes duplicate ids in this
Expand Down
69 changes: 23 additions & 46 deletions src/ray/core_worker/future_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,53 +28,30 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id,
rpc::GetObjectStatusRequest request;
request.set_object_id(object_id.Binary());
request.set_owner_worker_id(owner_address.worker_id());
conn->GetObjectStatus(request, [this, object_id](
const Status &status,
const rpc::GetObjectStatusReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id
<< " that was deserialized: " << status.ToString();
}
conn->GetObjectStatus(
request,
[this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id
<< " that was deserialized: " << status.ToString();
}

if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) {
// The owner is gone or the owner replied that the object has gone
// out of scope (this is an edge case in the distributed ref counting
// protocol where a borrower dies before it can notify the owner of
// another borrower). Store an error so that an exception will be
// thrown immediately when the worker tries to get the value.
RAY_UNUSED(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id));
} else if (reply.status() == rpc::GetObjectStatusReply::CREATED) {
// The object is either an indicator that the object is in Plasma, or
// the object has been returned directly in the reply. In either
// case, we put the corresponding RayObject into the in-memory store.
// If the owner later fails or the object is released, the raylet
// will eventually store an error in Plasma on our behalf.
const auto &data = reply.object().data();
std::shared_ptr<LocalMemoryBuffer> data_buffer;
if (data.size() > 0) {
RAY_LOG(DEBUG) << "Object returned directly in GetObjectStatus reply, putting "
<< object_id << " in memory store";
data_buffer = std::make_shared<LocalMemoryBuffer>(
const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(data.data())),
data.size());
} else {
RAY_LOG(DEBUG) << "Object not returned directly in GetObjectStatus reply, "
<< object_id << " will have to be fetched from Plasma";
}
const auto &metadata = reply.object().metadata();
std::shared_ptr<LocalMemoryBuffer> metadata_buffer;
if (metadata.size() > 0) {
metadata_buffer = std::make_shared<LocalMemoryBuffer>(
const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(metadata.data())),
metadata.size());
}
auto inlined_ids =
IdVectorFromProtobuf<ObjectID>(reply.object().nested_inlined_ids());
RAY_UNUSED(in_memory_store_->Put(
RayObject(data_buffer, metadata_buffer, inlined_ids), object_id));
}
});
if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) {
// The owner is gone or the owner replied that the object has gone
// out of scope (this is an edge case in the distributed ref counting
// protocol where a borrower dies before it can notify the owner of
// another borrower). Store an error so that an exception will be
// thrown immediately when the worker tries to get the value.
RAY_UNUSED(in_memory_store_->Put(
RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id));
} else {
// We can now try to fetch the object via plasma. If the owner later
// fails or the object is released, the raylet will eventually store
// an error in plasma on our behalf.
RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id));
}
});
}

} // namespace ray
1 change: 0 additions & 1 deletion src/ray/core_worker/future_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <memory>

#include "ray/common/grpc_util.h"
#include "ray/common/id.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/rpc/worker/core_worker_client.h"
Expand Down
12 changes: 0 additions & 12 deletions src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,13 @@ message GetObjectStatusRequest {
bytes object_id = 2;
}

message RayObject {
// Data of the object.
bytes data = 1;
// Metadata of the object.
bytes metadata = 2;
// ObjectIDs that were nested in data. This is only set for inlined objects.
repeated bytes nested_inlined_ids = 3;
}

message GetObjectStatusReply {
enum ObjectStatus {
CREATED = 0;
OUT_OF_SCOPE = 1;
FREED = 2;
}
ObjectStatus status = 1;
// The Ray object: either a concrete value, an in-Plasma indicator, or an
// exception.
RayObject object = 2;
}

message WaitForActorOutOfScopeRequest {
Expand Down

0 comments on commit 20acc3b

Please sign in to comment.