Skip to content

Commit 6ab01a2

Browse files
stephanie-wangrobertnishihara
authored andcommitted
[xray] Fix bug when counting a task's lineage size (#2600)
1 parent a0691ee commit 6ab01a2

File tree

3 files changed

+19
-11
lines changed

3 files changed

+19
-11
lines changed

src/ray/raylet/lineage_cache.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,12 @@ void LineageCache::AddReadyTask(const Task &task) {
219219
}
220220
}
221221

222-
uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const {
222+
uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id,
223+
std::unordered_set<TaskID> &seen) const {
224+
if (seen.count(task_id) == 1) {
225+
return 0;
226+
}
227+
seen.insert(task_id);
223228
if (subscribed_tasks_.count(task_id) == 1) {
224229
return 0;
225230
}
@@ -229,7 +234,7 @@ uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id) const {
229234
}
230235
uint64_t cnt = 1;
231236
for (const auto &parent_id : entry->GetParentTaskIds()) {
232-
cnt += CountUnsubscribedLineage(parent_id);
237+
cnt += CountUnsubscribedLineage(parent_id, seen);
233238
}
234239
return cnt;
235240
}
@@ -257,7 +262,9 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
257262
// NOTE(swang): The number of entries in the uncommitted lineage also
258263
// includes local tasks that haven't been committed yet, not just remote
259264
// tasks, so this is an overestimate.
260-
if (CountUnsubscribedLineage(task_id) > max_lineage_size_) {
265+
std::unordered_set<TaskID> seen;
266+
auto count = CountUnsubscribedLineage(task_id, seen);
267+
if (count > max_lineage_size_) {
261268
// Since this task was in state WAITING, check that we were not
262269
// already subscribed to the task.
263270
RAY_CHECK(SubscribeTask(task_id));

src/ray/raylet/lineage_cache.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,15 @@ class LineageCache {
248248
/// Unsubscribe from notifications for a task. Returns whether the operation
249249
/// was successful (whether we were subscribed).
250250
bool UnsubscribeTask(const TaskID &task_id);
251-
/// Count the size of unsubscribed and uncommitted lineage
252-
uint64_t CountUnsubscribedLineage(const TaskID &task_id) const;
251+
/// Count the size of unsubscribed and uncommitted lineage of the given task
252+
/// excluding the values that have already been visited.
253+
///
254+
/// \param task_id The task whose lineage should be counted.
255+
/// \param seen This set contains the keys of lineage entries counted so far,
256+
/// so that we don't revisit those nodes.
257+
/// \void The number of tasks that were counted.
258+
uint64_t CountUnsubscribedLineage(const TaskID &task_id,
259+
std::unordered_set<TaskID> &seen) const;
253260

254261
/// The client ID, used to request notifications for specific tasks.
255262
/// TODO(swang): Move the ClientID into the generic Table implementation.

test/stress_tests.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ def f(x):
5555
assert ray.services.all_processes_alive()
5656

5757

58-
@pytest.mark.skipif(
59-
os.environ.get("RAY_USE_XRAY") == "1",
60-
reason="This test does not work with xray yet.")
6158
def test_dependencies(ray_start_combination):
6259
@ray.remote
6360
def f(x):
@@ -81,9 +78,6 @@ def g(*xs):
8178
assert ray.services.all_processes_alive()
8279

8380

84-
@pytest.mark.skipif(
85-
os.environ.get("RAY_USE_XRAY") == "1",
86-
reason="This test does not work with xray yet.")
8781
def test_submitting_many_tasks(ray_start_regular):
8882
@ray.remote
8983
def f(x):

0 commit comments

Comments
 (0)