Skip to content

[core] When reconstruction is enabled, pin objects created by ray.put() #8021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,61 @@ def dependent_task(x):
raise e.as_instanceof_cause()


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.add_node(
num_cpus=1,
resources={"node2": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)

@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)

@ray.remote
def dependent_task(x):
return x

obj = ray.put(np.zeros(10**7, dtype=np.uint8))
result = dependent_task.options(resources={"node1": 1}).remote(obj)
ray.get(result)
del obj

cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)

for _ in range(20):
ray.put(np.zeros(10**7, dtype=np.uint8))

if reconstruction_enabled:
ray.get(result)
else:
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(result)


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
Expand Down
14 changes: 9 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,9 @@ Status CoreWorker::Put(const RayObject &object,
worker_context_.GetNextPutIndex(),
static_cast<uint8_t>(TaskTransportType::DIRECT));
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_, CurrentCallSite(), object.GetSize());
rpc_address_, CurrentCallSite(), object.GetSize(),
/*is_reconstructable=*/false,
ClientID::FromBinary(rpc_address_.raylet_id()));
return Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
}

Expand Down Expand Up @@ -783,9 +785,10 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t

// Only add the object to the reference counter if it didn't already exist.
if (data) {
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_, CurrentCallSite(),
data_size + metadata->Size());
reference_counter_->AddOwnedObject(
*object_id, contained_object_ids, GetCallerId(), rpc_address_, CurrentCallSite(),
data_size + metadata->Size(),
/*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id()));
}
return Status::OK();
}
Expand Down Expand Up @@ -1512,7 +1515,8 @@ Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT),
/*inner_ids=*/{}, GetCallerId(), rpc_address_,
CurrentCallSite(), -1);
CurrentCallSite(), -1,
/*is_reconstructable=*/false);
}
auto old_id = GetActorId();
SetActorId(actor_id);
Expand Down
29 changes: 15 additions & 14 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto(
const ReferenceTableProto &proto) {
ReferenceTable refs;
for (const auto &ref : proto) {
refs[ray::ObjectID::FromBinary(ref.reference().object_id())] =
Reference::FromProto(ref);
refs.emplace(ray::ObjectID::FromBinary(ref.reference().object_id()),
Reference::FromProto(ref));
}
return refs;
}
Expand Down Expand Up @@ -145,12 +145,11 @@ void ReferenceCounter::AddObjectRefStats(
}
}

void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
const std::vector<ObjectID> &inner_ids,
const TaskID &owner_id,
const rpc::Address &owner_address,
const std::string &call_site,
const int64_t object_size) {
void ReferenceCounter::AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &inner_ids,
const TaskID &owner_id, const rpc::Address &owner_address,
const std::string &call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id) {
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
absl::MutexLock lock(&mutex_);
RAY_CHECK(object_id_refs_.count(object_id) == 0)
Expand All @@ -159,7 +158,8 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
// because this corresponds to a submitted task whose return ObjectID will be created
// in the frontend language, incrementing the reference count.
object_id_refs_.emplace(object_id,
Reference(owner_id, owner_address, call_site, object_size));
Reference(owner_id, owner_address, call_site, object_size,
is_reconstructable, pinned_at_raylet_id));
if (!inner_ids.empty()) {
// Mark that this object ID contains other inner IDs. Then, we will not GC
// the inner objects until the outer object ID goes out of scope.
Expand Down Expand Up @@ -354,7 +354,8 @@ void ReferenceCounter::DeleteReferences(const std::vector<ObjectID> &object_ids)
}
it->second.local_ref_count = 0;
it->second.submitted_task_ref_count = 0;
if (distributed_ref_counting_enabled_ && !it->second.OutOfScope()) {
if (distributed_ref_counting_enabled_ &&
!it->second.OutOfScope(lineage_pinning_enabled_)) {
RAY_LOG(ERROR)
<< "ray.internal.free does not currently work for objects that are still in "
"scope when distributed reference "
Expand Down Expand Up @@ -387,7 +388,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
should_delete_value = true;
}

if (it->second.OutOfScope()) {
if (it->second.OutOfScope(lineage_pinning_enabled_)) {
// If distributed ref counting is enabled, then delete the object once its
// ref count across all processes is 0.
should_delete_value = true;
Expand Down Expand Up @@ -445,7 +446,7 @@ bool ReferenceCounter::SetDeleteCallback(
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return false;
} else if (it->second.OutOfScope() &&
} else if (it->second.OutOfScope(lineage_pinning_enabled_) &&
!it->second.ShouldDelete(lineage_pinning_enabled_)) {
// The object has already gone out of scope but cannot be deleted yet. Do
// not set the deletion callback because it may never get called.
Expand Down Expand Up @@ -486,7 +487,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id,
RAY_CHECK(!it->second.pinned_at_raylet_id.has_value());
// Only the owner tracks the location.
RAY_CHECK(it->second.owned_by_us);
if (!it->second.OutOfScope()) {
if (!it->second.OutOfScope(lineage_pinning_enabled_)) {
it->second.pinned_at_raylet_id = raylet_id;
}
}
Expand Down Expand Up @@ -789,7 +790,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id,
// the object was zero. Also, we should have stripped all distributed ref
// count information and returned it to the owner. Therefore, it should be
// okay to delete the object, if it wasn't already deleted.
RAY_CHECK(it->second.OutOfScope());
RAY_CHECK(it->second.OutOfScope(lineage_pinning_enabled_));
}
// Send the owner information about any new borrowers.
ReferenceTableToProto(borrowed_refs, reply->mutable_borrowed_refs());
Expand Down
42 changes: 30 additions & 12 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,20 @@ class ReferenceCounter {
/// possible to have leftover references after a task has finished.
///
/// \param[in] object_id The ID of the object that we own.
/// \param[in] inner_ids ObjectIDs that are contained in the object's value.
/// \param[in] contained_ids ObjectIDs that are contained in the object's value.
/// As long as the object_id is in scope, the inner objects should not be GC'ed.
/// \param[in] owner_id The ID of the object's owner.
/// \param[in] owner_address The address of the object's owner.
/// \param[in] dependencies The objects that the object depends on.
void AddOwnedObject(const ObjectID &object_id,
const std::vector<ObjectID> &contained_ids, const TaskID &owner_id,
const rpc::Address &owner_address, const std::string &call_site,
const int64_t object_size) LOCKS_EXCLUDED(mutex_);
/// \param[in] call_site Description of the call site where the reference was created.
/// \param[in] object_size Object size if known, otherwise -1;
/// \param[in] is_reconstructable Whether the object can be reconstructed
/// through lineage re-execution.
void AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
const TaskID &owner_id, const rpc::Address &owner_address,
const std::string &call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id = absl::optional<ClientID>())
LOCKS_EXCLUDED(mutex_);

/// Update the size of the object.
///
Expand Down Expand Up @@ -326,11 +331,14 @@ class ReferenceCounter {
: call_site(call_site), object_size(object_size) {}
/// Constructor for a reference that we created.
Reference(const TaskID &owner_id, const rpc::Address &owner_address,
std::string call_site, const int64_t object_size)
std::string call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id)
: call_site(call_site),
object_size(object_size),
owned_by_us(true),
owner({owner_id, owner_address}) {}
owner({owner_id, owner_address}),
is_reconstructable(is_reconstructable),
pinned_at_raylet_id(pinned_at_raylet_id) {}

/// Constructor from a protobuf. This is assumed to be a message from
/// another process, so the object defaults to not being owned by us.
Expand All @@ -353,13 +361,19 @@ class ReferenceCounter {
/// - The reference was contained in another ID that we were borrowing, and
/// we haven't told the process that gave us that ID yet.
/// - We gave the reference to at least one other process.
bool OutOfScope() const {
bool OutOfScope(bool lineage_pinning_enabled) const {
bool in_scope = RefCount() > 0;
bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value();
bool has_borrowers = borrowers.size() > 0;
bool was_stored_in_objects = stored_in_objects.size() > 0;

bool has_lineage_references = false;
if (lineage_pinning_enabled && owned_by_us && !is_reconstructable) {
has_lineage_references = lineage_ref_count > 0;
}

return !(in_scope || was_contained_in_borrowed_id || has_borrowers ||
was_stored_in_objects);
was_stored_in_objects || has_lineage_references);
}

/// Whether the Reference can be deleted. A Reference can only be deleted
Expand All @@ -369,9 +383,9 @@ class ReferenceCounter {
/// the object that may be retried in the future.
bool ShouldDelete(bool lineage_pinning_enabled) const {
if (lineage_pinning_enabled) {
return OutOfScope() && (lineage_ref_count == 0);
return OutOfScope(lineage_pinning_enabled) && (lineage_ref_count == 0);
} else {
return OutOfScope();
return OutOfScope(lineage_pinning_enabled);
}
}

Expand All @@ -392,6 +406,10 @@ class ReferenceCounter {
// counting is enabled, then some raylet must be pinning the object value.
// This is the address of that raylet.
absl::optional<ClientID> pinned_at_raylet_id;
// Whether this object can be reconstructed via lineage. If false, then the
// object's value will be pinned as long as it is referenced by any other
// object's lineage.
const bool is_reconstructable = false;

/// The local ref count for the ObjectID in the language frontend.
size_t local_ref_count = 0;
Expand Down
Loading