Skip to content

Commit

Permalink
Fix ObjectFetchTimedOutError
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao committed Jul 11, 2024
1 parent 26b9464 commit 751f638
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,11 @@ void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) {
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false);
}

void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ true);
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Update that the object is ready to be fetched.
void UpdateObjectReady(const ObjectID &object_id);

/// Update that the object's creator task is scheduled/executing.
void UpdateObjectPendingCreation(const ObjectID &object_id);

/// Whether the object is pending creation (the task that creates it is
/// scheduled/executing).
bool IsObjectPendingCreation(const ObjectID &object_id) const;
Expand Down
19 changes: 19 additions & 0 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ const int64_t kTaskFailureThrottlingThreshold = 50;
// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;

absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsConsumed() const {
absl::flat_hash_set<ObjectID> result;
for (int64_t index = 0; index < next_index_; index++) {
const auto &object_id = GetObjectRefAtIndex(index);
result.emplace(object_id);
}
return result;
}

absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
absl::flat_hash_set<ObjectID> result;
for (int64_t index = 0; index <= max_index_seen_; index++) {
Expand Down Expand Up @@ -1016,6 +1025,16 @@ bool TaskManager::RetryTaskIfPossible(const TaskID &task_id,
spec.AttemptNumber(),
RayConfig::instance().task_oom_retry_delay_base_ms())
: RayConfig::instance().task_retry_delay_ms();
if (spec.IsStreamingGenerator()) {
absl::MutexLock lock(&object_ref_stream_ops_mu_);
const auto generator_id = spec.ReturnId(0);
auto stream_it = object_ref_streams_.find(generator_id);
if (stream_it != object_ref_streams_.end()) {
for (ObjectID obj : stream_it->second.GetItemsConsumed()) {
reference_counter_->UpdateObjectPendingCreation(obj);
}
}
}
retry_task_callback_(spec, /*object_recovery*/ false, update_seqno, delay_ms);
return true;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class ObjectRefStream {
/// \return A list of object IDs that are not read yet.
absl::flat_hash_set<ObjectID> GetItemsUnconsumed() const;

absl::flat_hash_set<ObjectID> GetItemsConsumed() const;

/// Pop all ObjectIDs that are not read yet via
/// TryReadNextItem.
///
Expand Down

0 comments on commit 751f638

Please sign in to comment.