Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao committed Jul 11, 2024
1 parent 6b1f697 commit 9c7003f
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 79 deletions.
45 changes: 45 additions & 0 deletions python/ray/tests/test_reconstruction_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ray._private.ray_constants as ray_constants
from ray._private.internal_api import memory_summary
from ray._private.test_utils import Semaphore, SignalActor, wait_for_condition
import ray.exceptions

# Task status.
WAITING_FOR_DEPENDENCIES = "PENDING_ARGS_AVAIL"
Expand Down Expand Up @@ -498,6 +499,50 @@ def dependent_task(x):
ray.get(obj)


def test_pending_creation(config, ray_start_cluster):
config["fetch_fail_timeout_milliseconds"] = 5000
cluster = ray_start_cluster
cluster.add_node(num_cpus=0, resources={"head": 1}, _system_config=config)
ray.init(address=cluster.address)

@ray.remote(num_cpus=0, resources={"head": 0.1})
class Counter:
def __init__(self):
self.count = 0

def inc(self):
self.count = self.count + 1
return self.count

counter = Counter.remote()

@ray.remote(num_cpus=1, max_retries=-1)
def generator(counter):
if ray.get(counter.inc.remote()) == 1:
# first attempt
yield np.zeros(10**6, dtype=np.uint8)
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)
else:
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)

worker = cluster.add_node(num_cpus=8)
gen = generator.remote(counter)
obj = next(gen)

cluster.remove_node(worker, allow_graceful=False)
# After removing the node, the generator task will be retried
# and the obj will be reconstructured.
cluster.add_node(num_cpus=8)

# This should raise GetTimeoutError instead of ObjectFetchTimedOutError
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=10)


if __name__ == "__main__":
import pytest

Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps);

if (resubmitted) {
reference_counter_->UpdateObjectsPendingCreation({object_id}, true);
// Try to recover the task's dependencies.
for (const auto &dep : task_deps) {
auto recovered = RecoverObject(dep);
Expand Down
20 changes: 6 additions & 14 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,10 @@ void ReferenceCounter::RemoveLocalReferenceInternal(const ObjectID &object_id,
}

void ReferenceCounter::UpdateSubmittedTaskReferences(
const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove,
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids_to_add) {
RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id;
auto it = object_id_refs_.find(argument_id);
Expand All @@ -484,11 +480,8 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
}

void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids) {
const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
RAY_CHECK(it != object_id_refs_.end());
Expand All @@ -501,16 +494,12 @@ void ReferenceCounter::UpdateResubmittedTaskReferences(
}

void ReferenceCounter::UpdateFinishedTaskReferences(
const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids,
bool release_lineage,
const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs,
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreationInternal(return_id, false);
}
// Must merge the borrower refs before decrementing any ref counts. This is
// to make sure that for serialized IDs, we increment the borrower count for
// the inner ID before decrementing the submitted_task_ref_count for the
Expand Down Expand Up @@ -1522,9 +1511,12 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id,
return it->second.is_reconstructable;
}

void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) {
void ReferenceCounter::UpdateObjectsPendingCreation(
const std::vector<ObjectID> &object_ids, bool pending_creation) {
absl::MutexLock lock(&mutex_);
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false);
for (const auto &object_id : object_ids) {
UpdateObjectPendingCreationInternal(object_id, pending_creation);
}
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
Expand Down
17 changes: 8 additions & 9 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[out] deleted Any objects that are newly out of scope after this
/// function call.
void UpdateSubmittedTaskReferences(
const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids_to_add,
const std::vector<ObjectID> &argument_ids_to_remove = std::vector<ObjectID>(),
std::vector<ObjectID> *deleted = nullptr) ABSL_LOCKS_EXCLUDED(mutex_);
Expand All @@ -131,15 +130,14 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// have already incremented them when the task was first submitted.
///
/// \param[in] argument_ids The arguments of the task to add references for.
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids)
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> &argument_ids)
ABSL_LOCKS_EXCLUDED(mutex_);

/// Update object references that were given to a submitted task. The task
/// may still be borrowing any object IDs that were contained in its
/// arguments. This should be called when the task finishes.
///
/// \param[in] object_ids The object IDs to remove references for.
/// \param[in] argument_ids The object IDs to remove references for.
/// \param[in] release_lineage Whether to decrement the arguments' lineage
/// ref count.
/// \param[in] worker_addr The address of the worker that executed the task.
Expand All @@ -149,8 +147,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// arguments. Some references in this table may still be borrowed by the
/// worker and/or a task that the worker submitted.
/// \param[out] deleted The object IDs whos reference counts reached zero.
void UpdateFinishedTaskReferences(const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids,
void UpdateFinishedTaskReferences(const std::vector<ObjectID> &argument_ids,
bool release_lineage,
const rpc::Address &worker_addr,
const ReferenceTableProto &borrowed_refs,
Expand Down Expand Up @@ -559,8 +556,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[in] min_bytes_to_evict The minimum number of bytes to evict.
int64_t EvictLineage(int64_t min_bytes_to_evict);

/// Update that the object is ready to be fetched.
void UpdateObjectReady(const ObjectID &object_id);
/// Update whether the objects are pending creation.
void UpdateObjectsPendingCreation(const std::vector<ObjectID> &object_ids,
bool pending_creation);

/// Whether the object is pending creation (the task that creates it is
/// scheduled/executing).
Expand Down Expand Up @@ -802,7 +800,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// any child object is in scope.
bool has_nested_refs_to_report = false;

/// Whether the task that creates this object is scheduled/executing.
/// Whether the object has no locations in the cluster and
/// the task that creates this object is scheduled/executing.
bool pending_creation = false;

/// Whether or not this object was spilled.
Expand Down
25 changes: 8 additions & 17 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
returned_refs.push_back(std::move(ref));
}

reference_counter_->UpdateSubmittedTaskReferences(return_ids, task_deps);
reference_counter_->UpdateObjectsPendingCreation(return_ids, true);
reference_counter_->UpdateSubmittedTaskReferences(task_deps);

// If it is a generator task, create an object ref stream.
// The language frontend is responsible for calling DeleteObjectRefStream.
Expand Down Expand Up @@ -303,7 +304,6 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
TaskSpecification spec;
bool resubmit = false;
std::vector<ObjectID> return_ids;
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
Expand All @@ -330,10 +330,6 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
RAY_CHECK(it->second.num_retries_left == -1);
}
spec = it->second.spec;

for (const auto &return_id : it->second.reconstructable_return_ids) {
return_ids.push_back(return_id);
}
}
}

Expand All @@ -349,7 +345,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
}

reference_counter_->UpdateResubmittedTaskReferences(return_ids, *task_deps);
reference_counter_->UpdateResubmittedTaskReferences(*task_deps);

for (const auto &task_dep : *task_deps) {
bool was_freed = reference_counter_->TryMarkFreedObjectInUseAgain(task_dep);
Expand All @@ -366,8 +362,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
if (spec.IsActorTask()) {
const auto actor_creation_return_id = spec.ActorCreationDummyObjectId();
reference_counter_->UpdateResubmittedTaskReferences(return_ids,
{actor_creation_return_id});
reference_counter_->UpdateResubmittedTaskReferences({actor_creation_return_id});
}

RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object, attempt #"
Expand Down Expand Up @@ -706,7 +701,7 @@ bool TaskManager::HandleReportGeneratorItemReturns(
num_objects_written += 1;
}
// When an object is reported, the object is ready to be fetched.
reference_counter_->UpdateObjectReady(object_id);
reference_counter_->UpdateObjectsPendingCreation({object_id}, false);
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().raylet_id()),
Expand Down Expand Up @@ -1150,7 +1145,6 @@ void TaskManager::OnTaskDependenciesInlined(
const std::vector<ObjectID> &contained_ids) {
std::vector<ObjectID> deleted;
reference_counter_->UpdateSubmittedTaskReferences(
/*return_ids=*/{},
/*argument_ids_to_add=*/contained_ids,
/*argument_ids_to_remove=*/inlined_dependency_ids,
&deleted);
Expand Down Expand Up @@ -1196,13 +1190,10 @@ void TaskManager::RemoveFinishedTaskReferences(
}
}

reference_counter_->UpdateObjectsPendingCreation(return_ids, false);
std::vector<ObjectID> deleted;
reference_counter_->UpdateFinishedTaskReferences(return_ids,
plasma_dependencies,
release_lineage,
borrower_addr,
borrowed_refs,
&deleted);
reference_counter_->UpdateFinishedTaskReferences(
plasma_dependencies, release_lineage, borrower_addr, borrowed_refs, &deleted);
in_memory_store_->Delete(deleted);
}

Expand Down
Loading

0 comments on commit 9c7003f

Please sign in to comment.