Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix ObjectFetchTimedOutError #46562

Merged
merged 8 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions python/ray/tests/test_reconstruction_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ray._private.ray_constants as ray_constants
from ray._private.internal_api import memory_summary
from ray._private.test_utils import Semaphore, SignalActor, wait_for_condition
import ray.exceptions

# Task status.
WAITING_FOR_DEPENDENCIES = "PENDING_ARGS_AVAIL"
Expand Down Expand Up @@ -498,6 +499,52 @@ def dependent_task(x):
ray.get(obj)


def test_object_reconstruction_pending_creation(config, ray_start_cluster):
# Test to make sure that an object being reconstructured
# has pending_creation set to true.
config["fetch_fail_timeout_milliseconds"] = 5000
cluster = ray_start_cluster
cluster.add_node(num_cpus=0, resources={"head": 1}, _system_config=config)
ray.init(address=cluster.address)

@ray.remote(num_cpus=0, resources={"head": 0.1})
class Counter:
def __init__(self):
self.count = 0

def inc(self):
self.count = self.count + 1
return self.count

counter = Counter.remote()

@ray.remote(num_cpus=1, max_retries=-1)
def generator(counter):
if ray.get(counter.inc.remote()) == 1:
# first attempt
yield np.zeros(10**6, dtype=np.uint8)
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)
else:
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)
time.sleep(10000000)
yield np.zeros(10**6, dtype=np.uint8)

worker = cluster.add_node(num_cpus=8)
gen = generator.remote(counter)
obj = next(gen)

cluster.remove_node(worker, allow_graceful=False)
# After removing the node, the generator task will be retried
# and the obj will be reconstructured and has pending_creation set to true.
cluster.add_node(num_cpus=8)

# This should raise GetTimeoutError instead of ObjectFetchTimedOutError
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=10)


if __name__ == "__main__":
import pytest

Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps);

if (resubmitted) {
reference_counter_->UpdateObjectPendingCreation(object_id, true);
// Try to recover the task's dependencies.
for (const auto &dep : task_deps) {
auto recovered = RecoverObject(dep);
Expand Down
10 changes: 4 additions & 6 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,8 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
}

void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids) {
const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
RAY_CHECK(it != object_id_refs_.end());
Expand Down Expand Up @@ -1522,9 +1519,10 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id,
return it->second.is_reconstructable;
}

void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) {
void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id,
bool pending_creation) {
absl::MutexLock lock(&mutex_);
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false);
UpdateObjectPendingCreationInternal(object_id, pending_creation);
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
Expand Down
7 changes: 3 additions & 4 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// have already incremented them when the task was first submitted.
///
/// \param[in] argument_ids The arguments of the task to add references for.
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> return_ids,
const std::vector<ObjectID> &argument_ids)
void UpdateResubmittedTaskReferences(const std::vector<ObjectID> &argument_ids)
ABSL_LOCKS_EXCLUDED(mutex_);

/// Update object references that were given to a submitted task. The task
Expand Down Expand Up @@ -559,8 +558,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// \param[in] min_bytes_to_evict The minimum number of bytes to evict.
int64_t EvictLineage(int64_t min_bytes_to_evict);

/// Update that the object is ready to be fetched.
void UpdateObjectReady(const ObjectID &object_id);
/// Update whether the object is pending creation.
void UpdateObjectPendingCreation(const ObjectID &object_id, bool pending_creation);

/// Whether the object is pending creation (the task that creates it is
/// scheduled/executing).
Expand Down
12 changes: 3 additions & 9 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
TaskSpecification spec;
bool resubmit = false;
std::vector<ObjectID> return_ids;
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
Expand All @@ -330,10 +329,6 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
RAY_CHECK(it->second.num_retries_left == -1);
}
spec = it->second.spec;

for (const auto &return_id : it->second.reconstructable_return_ids) {
return_ids.push_back(return_id);
}
}
}

Expand All @@ -349,7 +344,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
}

reference_counter_->UpdateResubmittedTaskReferences(return_ids, *task_deps);
reference_counter_->UpdateResubmittedTaskReferences(*task_deps);

for (const auto &task_dep : *task_deps) {
bool was_freed = reference_counter_->TryMarkFreedObjectInUseAgain(task_dep);
Expand All @@ -366,8 +361,7 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
if (spec.IsActorTask()) {
const auto actor_creation_return_id = spec.ActorCreationDummyObjectId();
reference_counter_->UpdateResubmittedTaskReferences(return_ids,
{actor_creation_return_id});
reference_counter_->UpdateResubmittedTaskReferences({actor_creation_return_id});
}

RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object, attempt #"
Expand Down Expand Up @@ -706,7 +700,7 @@ bool TaskManager::HandleReportGeneratorItemReturns(
num_objects_written += 1;
}
// When an object is reported, the object is ready to be fetched.
reference_counter_->UpdateObjectReady(object_id);
reference_counter_->UpdateObjectPendingCreation(object_id, false);
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().raylet_id()),
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/test/object_recovery_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
task_resubmitter_->AddTask(object_id.TaskId(), {});

ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(ref_counter_->IsObjectPendingCreation(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);

ASSERT_TRUE(failed_reconstructions_.empty());
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/test/reference_count_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestUnreconstructableObjectOutOfScope)

// Unreconstructable objects go out of scope once their lineage ref count
// reaches 0.
rc->UpdateResubmittedTaskReferences({return_id}, {id});
rc->UpdateResubmittedTaskReferences({id});
rc->UpdateObjectPendingCreation(return_id, true);
ASSERT_TRUE(rc->IsObjectPendingCreation(return_id));
rc->UpdateFinishedTaskReferences(
{return_id}, {id}, true, empty_borrower, empty_refs, &out);
Expand Down Expand Up @@ -2634,7 +2635,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
ASSERT_TRUE(rc->HasReference(id));

// Simulate retrying the task.
rc->UpdateResubmittedTaskReferences({}, {id});
rc->UpdateResubmittedTaskReferences({id});
rc->UpdateFinishedTaskReferences({}, {id}, true, empty_borrower, empty_refs, &out);
ASSERT_FALSE(rc->HasReference(id));
ASSERT_EQ(lineage_deleted.size(), 1);
Expand Down
1 change: 0 additions & 1 deletion src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,6 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
ASSERT_EQ(last_delay_ms_, 0);
ASSERT_EQ(last_object_recovery_, true);
resubmitted_task_deps.clear();
ASSERT_TRUE(reference_counter_->IsObjectPendingCreation(return_id));

// The return ID goes out of scope.
reference_counter_->RemoveLocalReference(return_id, nullptr);
Expand Down
6 changes: 3 additions & 3 deletions src/ray/protobuf/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ message WorkerObjectLocationsPubMessage {
// counting protocol that causes the object to be released while there are
// still references.
bool ref_removed = 7;
// If this is set, the task that creates the object is pending execution. If
// there are no locations and this is set, the subscriber should wait for the
// new location to appear.
// If this is set, the object has no location in the cluster and
// the task that (re)creates the object is pending execution.
// The subscriber should wait for the new location to appear.
bool pending_creation = 8;
// Whether or not this object was spilled.
bool did_spill = 9;
Expand Down