Skip to content

Commit

Permalink
[xray] Turn on flushing to the GCS for the lineage cache (ray-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang authored and robertnishihara committed Apr 15, 2018
1 parent fcd3044 commit 4b655b0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
26 changes: 15 additions & 11 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
void LineageCache::AddReadyTask(const Task &task) {
auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY);
RAY_CHECK(lineage_.SetEntry(std::move(new_entry)));
// Add the task to the cache of tasks that may be flushed.
uncommitted_ready_tasks_.insert(task.GetTaskSpecification().TaskId());

// Try to flush the task to the GCS.
// TODO(swang): Allow a pluggable policy for when to flush.
RAY_CHECK_OK(Flush());
}

void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
Expand Down Expand Up @@ -214,21 +220,17 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const {
}

Status LineageCache::Flush() {
// Find all tasks that are READY and whose arguments have been committed in the GCS.
// Iterate through all tasks that are READY.
std::vector<TaskID> ready_task_ids;
for (const auto &pair : lineage_.GetEntries()) {
auto task_id = pair.first;
auto entry = pair.second;
// Skip task entries that are not ready to be written yet. These tasks
// either have not started execution yet, are being executed on a remote
// node, or have already been written to the GCS.
if (entry.GetStatus() != GcsStatus_UNCOMMITTED_READY) {
continue;
}
for (const auto &task_id : uncommitted_ready_tasks_) {
auto entry = lineage_.GetEntry(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY);

// Check if all arguments have been committed to the GCS before writing
// this task.
bool all_arguments_committed = true;
for (const auto &parent_id : entry.GetParentTaskIds()) {
for (const auto &parent_id : entry->GetParentTaskIds()) {
auto parent = lineage_.GetEntry(parent_id);
// If a parent entry exists in the lineage cache but has not been
// committed yet, then as far as we know, it's still in flight to the
Expand Down Expand Up @@ -268,6 +270,8 @@ Status LineageCache::Flush() {
auto entry = lineage_.PopEntry(ready_task_id);
RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING));
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));
// Erase the task from the cache of uncommitted ready tasks.
uncommitted_ready_tasks_.erase(ready_task_id);
}

return ray::Status::OK();
Expand Down
7 changes: 7 additions & 0 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ class LineageCache {

/// The durable storage system for task information.
gcs::TableInterface<TaskID, protocol::Task> &task_storage_;
/// The set of tasks that are in UNCOMMITTED_READY state. This is a cache of
/// the tasks that may be flushable.
// TODO(swang): As an optimization, we may also want to further distinguish
// which tasks are flushable, to avoid iterating over tasks that are in
// UNCOMMITTED_READY, but that have dependencies that have not been committed
// yet.
std::unordered_set<TaskID, UniqueIDHasher> uncommitted_ready_tasks_;
/// All tasks and objects that we are responsible for writing back to the
/// GCS, and the tasks and objects in their lineage.
Lineage lineage_;
Expand Down

0 comments on commit 4b655b0

Please sign in to comment.