diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index d0da2b187913..0d55b0033d91 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -187,6 +187,12 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l void LineageCache::AddReadyTask(const Task &task) { auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY); RAY_CHECK(lineage_.SetEntry(std::move(new_entry))); + // Add the task to the cache of tasks that may be flushed. + uncommitted_ready_tasks_.insert(task.GetTaskSpecification().TaskId()); + + // Try to flush the task to the GCS. + // TODO(swang): Allow a pluggable policy for when to flush. + RAY_CHECK_OK(Flush()); } void LineageCache::RemoveWaitingTask(const TaskID &task_id) { @@ -214,21 +220,17 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const { } Status LineageCache::Flush() { - // Find all tasks that are READY and whose arguments have been committed in the GCS. + // Iterate through all tasks that are READY. std::vector ready_task_ids; - for (const auto &pair : lineage_.GetEntries()) { - auto task_id = pair.first; - auto entry = pair.second; - // Skip task entries that are not ready to be written yet. These tasks - // either have not started execution yet, are being executed on a remote - // node, or have already been written to the GCS. - if (entry.GetStatus() != GcsStatus_UNCOMMITTED_READY) { - continue; - } + for (const auto &task_id : uncommitted_ready_tasks_) { + auto entry = lineage_.GetEntry(task_id); + RAY_CHECK(entry); + RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY); + // Check if all arguments have been committed to the GCS before writing // this task. bool all_arguments_committed = true; - for (const auto &parent_id : entry.GetParentTaskIds()) { + for (const auto &parent_id : entry->GetParentTaskIds()) { auto parent = lineage_.GetEntry(parent_id); // If a parent entry exists in the lineage cache but has not been // committed yet, then as far as we know, it's still in flight to the @@ -268,6 +270,8 @@ Status LineageCache::Flush() { auto entry = lineage_.PopEntry(ready_task_id); RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING)); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); + // Erase the task from the cache of uncommitted ready tasks. + uncommitted_ready_tasks_.erase(ready_task_id); } return ray::Status::OK(); diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 6dac1a3d6040..e9904277f3b9 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -204,6 +204,13 @@ class LineageCache { /// The durable storage system for task information. gcs::TableInterface &task_storage_; + /// The set of tasks that are in UNCOMMITTED_READY state. This is a cache of + /// the tasks that may be flushable. + // TODO(swang): As an optimization, we may also want to further distinguish + // which tasks are flushable, to avoid iterating over tasks that are in + // UNCOMMITTED_READY, but that have dependencies that have not been committed + // yet. + std::unordered_set uncommitted_ready_tasks_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_;