Skip to content

Commit

Permalink
[core] When reconstruction is enabled, pin objects created by ray.put…
Browse files Browse the repository at this point in the history
…() (#8021)

* Unit test and pin ray.put objects until they have no more lineage references

* c++ tests

* lint

* Mark ray.put objects as pinned
  • Loading branch information
stephanie-wang authored Apr 20, 2020
1 parent 17e3c54 commit 1323e17
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 48 deletions.
55 changes: 55 additions & 0 deletions python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,61 @@ def dependent_task(x):
raise e.as_instanceof_cause()


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
"num_heartbeats_timeout": 10,
"raylet_heartbeat_timeout_milliseconds": 100,
"lineage_pinning_enabled": 1 if reconstruction_enabled else 0,
"free_objects_period_milliseconds": -1,
})
cluster = Cluster()
# Head node with no resources.
cluster.add_node(num_cpus=0, _internal_config=config)
# Node to place the initial object.
node_to_kill = cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.add_node(
num_cpus=1,
resources={"node2": 1},
object_store_memory=10**8,
_internal_config=config)
cluster.wait_for_nodes()
ray.init(address=cluster.address, _internal_config=config)

@ray.remote(max_retries=1 if reconstruction_enabled else 0)
def large_object():
return np.zeros(10**7, dtype=np.uint8)

@ray.remote
def dependent_task(x):
return x

obj = ray.put(np.zeros(10**7, dtype=np.uint8))
result = dependent_task.options(resources={"node1": 1}).remote(obj)
ray.get(result)
del obj

cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(
num_cpus=1,
resources={"node1": 1},
object_store_memory=10**8,
_internal_config=config)

for _ in range(20):
ray.put(np.zeros(10**7, dtype=np.uint8))

if reconstruction_enabled:
ray.get(result)
else:
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(result)


@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled):
config = json.dumps({
Expand Down
14 changes: 9 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,9 @@ Status CoreWorker::Put(const RayObject &object,
worker_context_.GetNextPutIndex(),
static_cast<uint8_t>(TaskTransportType::DIRECT));
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_, CurrentCallSite(), object.GetSize());
rpc_address_, CurrentCallSite(), object.GetSize(),
/*is_reconstructable=*/false,
ClientID::FromBinary(rpc_address_.raylet_id()));
return Put(object, contained_object_ids, *object_id, /*pin_object=*/true);
}

Expand Down Expand Up @@ -783,9 +785,10 @@ Status CoreWorker::Create(const std::shared_ptr<Buffer> &metadata, const size_t

// Only add the object to the reference counter if it didn't already exist.
if (data) {
reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(),
rpc_address_, CurrentCallSite(),
data_size + metadata->Size());
reference_counter_->AddOwnedObject(
*object_id, contained_object_ids, GetCallerId(), rpc_address_, CurrentCallSite(),
data_size + metadata->Size(),
/*is_reconstructable=*/false, ClientID::FromBinary(rpc_address_.raylet_id()));
}
return Status::OK();
}
Expand Down Expand Up @@ -1512,7 +1515,8 @@ Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
for (size_t i = 0; i < task_spec.NumReturns(); i++) {
reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT),
/*inner_ids=*/{}, GetCallerId(), rpc_address_,
CurrentCallSite(), -1);
CurrentCallSite(), -1,
/*is_reconstructable=*/false);
}
auto old_id = GetActorId();
SetActorId(actor_id);
Expand Down
29 changes: 15 additions & 14 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ ReferenceCounter::ReferenceTable ReferenceCounter::ReferenceTableFromProto(
const ReferenceTableProto &proto) {
ReferenceTable refs;
for (const auto &ref : proto) {
refs[ray::ObjectID::FromBinary(ref.reference().object_id())] =
Reference::FromProto(ref);
refs.emplace(ray::ObjectID::FromBinary(ref.reference().object_id()),
Reference::FromProto(ref));
}
return refs;
}
Expand Down Expand Up @@ -145,12 +145,11 @@ void ReferenceCounter::AddObjectRefStats(
}
}

void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
const std::vector<ObjectID> &inner_ids,
const TaskID &owner_id,
const rpc::Address &owner_address,
const std::string &call_site,
const int64_t object_size) {
void ReferenceCounter::AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &inner_ids,
const TaskID &owner_id, const rpc::Address &owner_address,
const std::string &call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id) {
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
absl::MutexLock lock(&mutex_);
RAY_CHECK(object_id_refs_.count(object_id) == 0)
Expand All @@ -159,7 +158,8 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
// because this corresponds to a submitted task whose return ObjectID will be created
// in the frontend language, incrementing the reference count.
object_id_refs_.emplace(object_id,
Reference(owner_id, owner_address, call_site, object_size));
Reference(owner_id, owner_address, call_site, object_size,
is_reconstructable, pinned_at_raylet_id));
if (!inner_ids.empty()) {
// Mark that this object ID contains other inner IDs. Then, we will not GC
// the inner objects until the outer object ID goes out of scope.
Expand Down Expand Up @@ -354,7 +354,8 @@ void ReferenceCounter::DeleteReferences(const std::vector<ObjectID> &object_ids)
}
it->second.local_ref_count = 0;
it->second.submitted_task_ref_count = 0;
if (distributed_ref_counting_enabled_ && !it->second.OutOfScope()) {
if (distributed_ref_counting_enabled_ &&
!it->second.OutOfScope(lineage_pinning_enabled_)) {
RAY_LOG(ERROR)
<< "ray.internal.free does not currently work for objects that are still in "
"scope when distributed reference "
Expand Down Expand Up @@ -387,7 +388,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
should_delete_value = true;
}

if (it->second.OutOfScope()) {
if (it->second.OutOfScope(lineage_pinning_enabled_)) {
// If distributed ref counting is enabled, then delete the object once its
// ref count across all processes is 0.
should_delete_value = true;
Expand Down Expand Up @@ -445,7 +446,7 @@ bool ReferenceCounter::SetDeleteCallback(
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
return false;
} else if (it->second.OutOfScope() &&
} else if (it->second.OutOfScope(lineage_pinning_enabled_) &&
!it->second.ShouldDelete(lineage_pinning_enabled_)) {
// The object has already gone out of scope but cannot be deleted yet. Do
// not set the deletion callback because it may never get called.
Expand Down Expand Up @@ -486,7 +487,7 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id,
RAY_CHECK(!it->second.pinned_at_raylet_id.has_value());
// Only the owner tracks the location.
RAY_CHECK(it->second.owned_by_us);
if (!it->second.OutOfScope()) {
if (!it->second.OutOfScope(lineage_pinning_enabled_)) {
it->second.pinned_at_raylet_id = raylet_id;
}
}
Expand Down Expand Up @@ -789,7 +790,7 @@ void ReferenceCounter::HandleRefRemoved(const ObjectID &object_id,
// the object was zero. Also, we should have stripped all distributed ref
// count information and returned it to the owner. Therefore, it should be
// okay to delete the object, if it wasn't already deleted.
RAY_CHECK(it->second.OutOfScope());
RAY_CHECK(it->second.OutOfScope(lineage_pinning_enabled_));
}
// Send the owner information about any new borrowers.
ReferenceTableToProto(borrowed_refs, reply->mutable_borrowed_refs());
Expand Down
42 changes: 30 additions & 12 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,20 @@ class ReferenceCounter {
/// possible to have leftover references after a task has finished.
///
/// \param[in] object_id The ID of the object that we own.
/// \param[in] inner_ids ObjectIDs that are contained in the object's value.
/// \param[in] contained_ids ObjectIDs that are contained in the object's value.
/// As long as the object_id is in scope, the inner objects should not be GC'ed.
/// \param[in] owner_id The ID of the object's owner.
/// \param[in] owner_address The address of the object's owner.
/// \param[in] dependencies The objects that the object depends on.
void AddOwnedObject(const ObjectID &object_id,
const std::vector<ObjectID> &contained_ids, const TaskID &owner_id,
const rpc::Address &owner_address, const std::string &call_site,
const int64_t object_size) LOCKS_EXCLUDED(mutex_);
/// \param[in] call_site Description of the call site where the reference was created.
/// \param[in] object_size Object size if known, otherwise -1;
/// \param[in] is_reconstructable Whether the object can be reconstructed
/// through lineage re-execution.
void AddOwnedObject(
const ObjectID &object_id, const std::vector<ObjectID> &contained_ids,
const TaskID &owner_id, const rpc::Address &owner_address,
const std::string &call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id = absl::optional<ClientID>())
LOCKS_EXCLUDED(mutex_);

/// Update the size of the object.
///
Expand Down Expand Up @@ -326,11 +331,14 @@ class ReferenceCounter {
: call_site(call_site), object_size(object_size) {}
/// Constructor for a reference that we created.
Reference(const TaskID &owner_id, const rpc::Address &owner_address,
std::string call_site, const int64_t object_size)
std::string call_site, const int64_t object_size, bool is_reconstructable,
const absl::optional<ClientID> &pinned_at_raylet_id)
: call_site(call_site),
object_size(object_size),
owned_by_us(true),
owner({owner_id, owner_address}) {}
owner({owner_id, owner_address}),
is_reconstructable(is_reconstructable),
pinned_at_raylet_id(pinned_at_raylet_id) {}

/// Constructor from a protobuf. This is assumed to be a message from
/// another process, so the object defaults to not being owned by us.
Expand All @@ -353,13 +361,19 @@ class ReferenceCounter {
/// - The reference was contained in another ID that we were borrowing, and
/// we haven't told the process that gave us that ID yet.
/// - We gave the reference to at least one other process.
bool OutOfScope() const {
bool OutOfScope(bool lineage_pinning_enabled) const {
bool in_scope = RefCount() > 0;
bool was_contained_in_borrowed_id = contained_in_borrowed_id.has_value();
bool has_borrowers = borrowers.size() > 0;
bool was_stored_in_objects = stored_in_objects.size() > 0;

bool has_lineage_references = false;
if (lineage_pinning_enabled && owned_by_us && !is_reconstructable) {
has_lineage_references = lineage_ref_count > 0;
}

return !(in_scope || was_contained_in_borrowed_id || has_borrowers ||
was_stored_in_objects);
was_stored_in_objects || has_lineage_references);
}

/// Whether the Reference can be deleted. A Reference can only be deleted
Expand All @@ -369,9 +383,9 @@ class ReferenceCounter {
/// the object that may be retried in the future.
bool ShouldDelete(bool lineage_pinning_enabled) const {
if (lineage_pinning_enabled) {
return OutOfScope() && (lineage_ref_count == 0);
return OutOfScope(lineage_pinning_enabled) && (lineage_ref_count == 0);
} else {
return OutOfScope();
return OutOfScope(lineage_pinning_enabled);
}
}

Expand All @@ -392,6 +406,10 @@ class ReferenceCounter {
// counting is enabled, then some raylet must be pinning the object value.
// This is the address of that raylet.
absl::optional<ClientID> pinned_at_raylet_id;
// Whether this object can be reconstructed via lineage. If false, then the
// object's value will be pinned as long as it is referenced by any other
// object's lineage.
const bool is_reconstructable = false;

/// The local ref count for the ObjectID in the language frontend.
size_t local_ref_count = 0;
Expand Down
Loading

0 comments on commit 1323e17

Please sign in to comment.