Skip to content

Commit

Permalink
[Core] Fix ObjectFetchTimedOutError (ray-project#46562)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Jul 13, 2024
1 parent 84c00f1 commit 7369543
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 25 deletions.
47 changes: 47 additions & 0 deletions python/ray/tests/test_reconstruction_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ray._private.ray_constants as ray_constants
from ray._private.internal_api import memory_summary
from ray._private.test_utils import Semaphore, SignalActor, wait_for_condition
import ray.exceptions

# Task status.
WAITING_FOR_DEPENDENCIES = "PENDING_ARGS_AVAIL"
Expand Down Expand Up @@ -498,6 +499,52 @@ def dependent_task(x):
ray.get(obj)


def test_object_reconstruction_pending_creation(config, ray_start_cluster):
# Test to make sure that an object being reconstructured
# has pending_creation set to true.
config["fetch_fail_timeout_milliseconds"] = 5000
cluster = ray_start_cluster
cluster.add_node(num_cpus=0, resources={"head": 1}, _system_config=config)
ray.init(address=cluster.address)

@ray.remote(num_cpus=0, resources={"head": 0.1})
class Counter:
def __init__(self):
self.count = 0

def inc(self):
self.count = self.count + 1
return self.count

counter = Counter.remote()

@ray.remote(num_cpus=1, max_retries=-1)
def generator(counter):
if ray.get(counter.inc.remote()) == 1:
# first attempt
yield np.zeros(10**6, dtype=np.uint8)
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)
else:
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)

worker = cluster.add_node(num_cpus=8)
gen = generator.remote(counter)
obj = next(gen)

cluster.remove_node(worker, allow_graceful=False)
# After removing the node, the generator task will be retried
# and the obj will be reconstructured and has pending_creation set to true.
cluster.add_node(num_cpus=8)

# This should raise GetTimeoutError instead of ObjectFetchTimedOutError
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=10)


if __name__ == "__main__":
import pytest

Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps);

if (resubmitted) {
reference_counter_->UpdateObjectPendingCreation(object_id, true);
// Try to recover the task's dependencies.
for (const auto &dep : task_deps) {
auto recovered = RecoverObject(dep);
Expand Down
10 changes: 4 additions & 6 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,8 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
}

void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids) {
const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
RAY_CHECK(it != object_id_refs_.end());
Expand Down Expand Up @@ -1522,9 +1519,10 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id,
return it->second.is_reconstructable;
}

void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) {
void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id,
bool pending_creation) {
absl::MutexLock lock(&mutex_);
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false);
UpdateObjectPendingCreationInternal(object_id, pending_creation);
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
Expand Down
7 changes: 3 additions & 4 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// have already incremented them when the task was first submitted.
///
/// \param[in] argument_ids The arguments of the task to add references for.
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids)
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> &argument_ids)
ABSL_LOCKS_EXCLUDED(mutex_);

/// Update object references that were given to a submitted task. The task
Expand Down Expand Up @@ -559,8 +558,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[in] min_bytes_to_evict The minimum number of bytes to evict.
int64_t EvictLineage(int64_t min_bytes_to_evict);

/// Update that the object is ready to be fetched.
void UpdateObjectReady(const ObjectID &object_id);
/// Update whether the object is pending creation.
void UpdateObjectPendingCreation(const ObjectID &object_id, bool pending_creation);

/// Whether the object is pending creation (the task that creates it is
/// scheduled/executing).
Expand Down
12 changes: 3 additions & 9 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
TaskSpecification spec;
bool resubmit = false;
std::vector<ObjectID> return_ids;
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
Expand All @@ -330,10 +329,6 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
RAY_CHECK(it->second.num_retries_left == -1);
}
spec = it->second.spec;

for (const auto &return_id : it->second.reconstructable_return_ids) {
return_ids.push_back(return_id);
}
}
}

Expand All @@ -349,7 +344,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
}

reference_counter_->UpdateResubmittedTaskReferences(return_ids, *task_deps);
reference_counter_->UpdateResubmittedTaskReferences(*task_deps);

for (const auto &task_dep : *task_deps) {
bool was_freed = reference_counter_->TryMarkFreedObjectInUseAgain(task_dep);
Expand All @@ -366,8 +361,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
if (spec.IsActorTask()) {
const auto actor_creation_return_id = spec.ActorCreationDummyObjectId();
reference_counter_->UpdateResubmittedTaskReferences(return_ids,
{actor_creation_return_id});
reference_counter_->UpdateResubmittedTaskReferences({actor_creation_return_id});
}

RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object, attempt #"
Expand Down Expand Up @@ -706,7 +700,7 @@ bool TaskManager::HandleReportGeneratorItemReturns(
num_objects_written += 1;
}
// When an object is reported, the object is ready to be fetched.
reference_counter_->UpdateObjectReady(object_id);
reference_counter_->UpdateObjectPendingCreation(object_id, false);
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().raylet_id()),
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/test/object_recovery_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
task_resubmitter_->AddTask(object_id.TaskId(), {});

ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(ref_counter_->IsObjectPendingCreation(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);

ASSERT_TRUE(failed_reconstructions_.empty());
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/test/reference_count_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)

// Unreconstructable objects go out of scope once their lineage ref count
// reaches 0.
rc->UpdateResubmittedTaskReferences({return_id}, {id});
rc->UpdateResubmittedTaskReferences({id});
rc->UpdateObjectPendingCreation(return_id, true);
ASSERT_TRUE(rc->IsObjectPendingCreation(return_id));
rc->UpdateFinishedTaskReferences(
{return_id}, {id}, true, empty_borrower, empty_refs, &out);
Expand Down Expand Up @@ -2634,7 +2635,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
ASSERT_TRUE(rc->HasReference(id));

// Simulate retrying the task.
rc->UpdateResubmittedTaskReferences({}, {id});
rc->UpdateResubmittedTaskReferences({id});
rc->UpdateFinishedTaskReferences({}, {id}, true, empty_borrower, empty_refs, &out);
ASSERT_FALSE(rc->HasReference(id));
ASSERT_EQ(lineage_deleted.size(), 1);
Expand Down
1 change: 0 additions & 1 deletion src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,6 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
ASSERT_EQ(last_delay_ms_, 0);
ASSERT_EQ(last_object_recovery_, true);
resubmitted_task_deps.clear();
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));

// The return ID goes out of scope.
reference_counter_->RemoveLocalReference(return_id, nullptr);
Expand Down
6 changes: 3 additions & 3 deletions src/ray/protobuf/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ message WorkerObjectLocationsPubMessage {
// counting protocol that causes the object to be released while there are
// still references.
bool ref_removed = 7;
// If this is set, the task that creates the object is pending execution. If
// there are no locations and this is set, the subscriber should wait for the
// new location to appear.
// If this is set, the object has no location in the cluster and
// the task that (re)creates the object is pending execution.
// The subscriber should wait for the new location to appear.
bool pending_creation = 8;
// Whether or not this object was spilled.
bool did_spill = 9;
Expand Down

0 comments on commit 7369543

Please sign in to comment.