Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] When reconstruction is enabled, pin objects created by ray.put() #8021

Merged
Prev Previous commit
Next Next commit
Mark ray.put objects as pinned
  • Loading branch information
stephanie-wang committed Apr 15, 2020
commit f51d919028f26c2fc29bd6104f68c5aeb72b8b87
11 changes: 6 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ Status CoreWorker::Put(const RayObject &object,
static_cast<uint8_t>(TaskTransportType::DIRECT));
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_, CurrentCallSite(), object.GetSize(),
/*is_reconstructable=*/false);
/*is_reconstructable=*/false,
ClientID::FromBinary(rpc_address_.raylet_id()));
return Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
}

Expand Down Expand Up @@ -784,10 +785,10 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t

// Only add the object to the reference counter if it didn't already exist.
if (data) {
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_, CurrentCallSite(),
data_size + metadata->Size(),
/*is_reconstructable=*/false);
reference_counter_->AddOwnedObject(
*object_id, contained_object_ids, GetCallerId(), rpc_address_, CurrentCallSite(),
data_size + metadata->Size(),
/*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id()));
}
return Status::OK();
}
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,18 @@ void ReferenceCounter::AddObjectRefStats(
void ReferenceCounter::AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &inner_ids,
const TaskID &owner_id, const rpc::Address &owner_address,
const std::string &call_site, const int64_t object_size, bool is_reconstructable) {
const std::string &call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id) {
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
absl::MutexLock lock(&mutex_);
RAY_CHECK(object_id_refs_.count(object_id) == 0)
<< "Tried to create an owned object that already exists: " << object_id;
// If the entry doesn't exist, we initialize the direct reference count to zero
// because this corresponds to a submitted task whose return ObjectID will be created
// in the frontend language, incrementing the reference count.
object_id_refs_.emplace(object_id, Reference(owner_id, owner_address, call_site,
object_size, is_reconstructable));
object_id_refs_.emplace(object_id,
Reference(owner_id, owner_address, call_site, object_size,
is_reconstructable, pinned_at_raylet_id));
if (!inner_ids.empty()) {
// Mark that this object ID contains other inner IDs. Then, we will not GC
// the inner objects until the outer object ID goes out of scope.
Expand Down
15 changes: 9 additions & 6 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ class ReferenceCounter {
/// \param[in] object_size Object size if known, otherwise -1;
/// \param[in] is_reconstructable Whether the object can be reconstructed
/// through lineage re-execution.
void AddOwnedObject(const ObjectID &object_id,
const std::vector<ObjectID> &contained_ids, const TaskID &owner_id,
const rpc::Address &owner_address, const std::string &call_site,
const int64_t object_size, bool is_reconstructable)
void AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
const TaskID &owner_id, const rpc::Address &owner_address,
const std::string &call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id = absl::optional<ClientID>())
LOCKS_EXCLUDED(mutex_);

/// Update the size of the object.
Expand Down Expand Up @@ -330,12 +331,14 @@ class ReferenceCounter {
: call_site(call_site), object_size(object_size) {}
/// Constructor for a reference that we created.
Reference(const TaskID &owner_id, const rpc::Address &owner_address,
std::string call_site, const int64_t object_size, bool is_reconstructable)
std::string call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id)
: call_site(call_site),
object_size(object_size),
owned_by_us(true),
owner({owner_id, owner_address}),
is_reconstructable(is_reconstructable) {}
is_reconstructable(is_reconstructable),
pinned_at_raylet_id(pinned_at_raylet_id) {}

/// Constructor from a protobuf. This is assumed to be a message from
/// another process, so the object defaults to not being owned by us.
Expand Down