Skip to content

Commit

Permalink
[core] Enable lineage reconstruction in CI (ray-project#21519)
Browse files Browse the repository at this point in the history
Enables lineage reconstruction in all CI and release tests.
  • Loading branch information
stephanie-wang authored Feb 18, 2022
1 parent 9482f03 commit 03a5589
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 169 deletions.
2 changes: 2 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,5 @@ build:ubsan --linkopt -fno-sanitize-recover=all
# Import local specific llvm config options, which can be generated by
# ci/travis/install-llvm-dependencies.sh
try-import %workspace%/.llvm-local.bazelrc

test:ci --test_env=RAY_lineage_pinning_enabled=1
88 changes: 0 additions & 88 deletions python/ray/workflow/tests/test_basic_workflows_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import ray
import re
from filelock import FileLock
from pathlib import Path
from ray._private.test_utils import run_string_as_driver, SignalActor
from ray import workflow
from ray.tests.conftest import * # noqa
Expand Down Expand Up @@ -268,93 +267,6 @@ def f():
workflow.get_actor("wf")


def test_wf_run(workflow_start_regular, tmp_path):
counter = tmp_path / "counter"
counter.write_text("0")

@workflow.step
def f():
v = int(counter.read_text()) + 1
counter.write_text(str(v))

f.step().run("abc")
assert counter.read_text() == "1"
# This will not rerun the job from beginning
f.step().run("abc")
assert counter.read_text() == "1"


def test_wf_no_run():
@workflow.step
def f1():
pass

f1.step()

@workflow.step
def f2(*w):
pass

f = f2.step(*[f1.step() for _ in range(10)])

with pytest.raises(Exception):
f.run()


def test_dedupe_indirect(workflow_start_regular, tmp_path):
counter = Path(tmp_path) / "counter.txt"
lock = Path(tmp_path) / "lock.txt"
counter.write_text("0")

@workflow.step
def incr():
with FileLock(str(lock)):
c = int(counter.read_text())
c += 1
counter.write_text(f"{c}")

@workflow.step
def identity(a):
return a

@workflow.step
def join(*a):
return counter.read_text()

# Here a is passed to two steps and we need to ensure
# it's only executed once
a = incr.step()
i1 = identity.step(a)
i2 = identity.step(a)
assert "1" == join.step(i1, i2).run()
assert "2" == join.step(i1, i2).run()
# pass a multiple times
assert "3" == join.step(a, a, a, a).run()
assert "4" == join.step(a, a, a, a).run()


def test_run_off_main_thread(workflow_start_regular):
@workflow.step
def fake_data(num: int):
return list(range(num))

succ = False

# Start new thread here ⚠️
def run():
global succ
# Setup the workflow.
data = fake_data.step(10)
assert data.run(workflow_id="run") == list(range(10))

import threading

t = threading.Thread(target=run)
t.start()
t.join()
assert workflow.get_status("run") == workflow.SUCCESSFUL


if __name__ == "__main__":
import sys

Expand Down
98 changes: 98 additions & 0 deletions python/ray/workflow/tests/test_basic_workflows_3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import pytest
from filelock import FileLock
from pathlib import Path
from ray import workflow
from ray.tests.conftest import * # noqa


def test_wf_run(workflow_start_regular, tmp_path):
counter = tmp_path / "counter"
counter.write_text("0")

@workflow.step
def f():
v = int(counter.read_text()) + 1
counter.write_text(str(v))

f.step().run("abc")
assert counter.read_text() == "1"
# This will not rerun the job from beginning
f.step().run("abc")
assert counter.read_text() == "1"


def test_wf_no_run():
@workflow.step
def f1():
pass

f1.step()

@workflow.step
def f2(*w):
pass

f = f2.step(*[f1.step() for _ in range(10)])

with pytest.raises(Exception):
f.run()


def test_dedupe_indirect(workflow_start_regular, tmp_path):
counter = Path(tmp_path) / "counter.txt"
lock = Path(tmp_path) / "lock.txt"
counter.write_text("0")

@workflow.step
def incr():
with FileLock(str(lock)):
c = int(counter.read_text())
c += 1
counter.write_text(f"{c}")

@workflow.step
def identity(a):
return a

@workflow.step
def join(*a):
return counter.read_text()

# Here a is passed to two steps and we need to ensure
# it's only executed once
a = incr.step()
i1 = identity.step(a)
i2 = identity.step(a)
assert "1" == join.step(i1, i2).run()
assert "2" == join.step(i1, i2).run()
# pass a multiple times
assert "3" == join.step(a, a, a, a).run()
assert "4" == join.step(a, a, a, a).run()


def test_run_off_main_thread(workflow_start_regular):
@workflow.step
def fake_data(num: int):
return list(range(num))

succ = False

# Start new thread here ⚠️
def run():
global succ
# Setup the workflow.
data = fake_data.step(10)
assert data.run(workflow_id="run") == list(range(10))

import threading

t = threading.Thread(target=run)
t.start()
t.join()
assert workflow.get_status("run") == workflow.SUCCESSFUL


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
80 changes: 0 additions & 80 deletions python/ray/workflow/tests/test_virtual_actor_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,86 +185,6 @@ def __setstate__(self, state):
assert ray.get(ret) == "UP"


@pytest.mark.parametrize(
"workflow_start_regular",
[
{
"num_cpus": 4
# We need more CPUs, otherwise 'create()' blocks 'get()'
}
],
indirect=True,
)
@pytest.mark.repeat(5)
def test_wf_in_actor_chain(workflow_start_regular, tmp_path):
file_lock = [str(tmp_path / str(i)) for i in range(5)]
fail_flag = tmp_path / "fail"

@workflow.virtual_actor
class Counter:
def __init__(self):
self._counter = 0

def incr(self, n):
with FileLock(file_lock[n]):
self._counter += 1
if fail_flag.exists():
raise Exception()

if n == 0:
return self._counter
else:
return self.incr.step(n - 1)

@workflow.virtual_actor.readonly
def val(self):
return self._counter

def __getstate__(self):
return self._counter

def __setstate__(self, v):
self._counter = v

locks = [FileLock(f) for f in file_lock]
for lock in locks:
lock.acquire()

c = Counter.get_or_create("counter")
ray.get(c.ready())
final_ret = c.incr.run_async(len(file_lock) - 1)
for i in range(0, len(file_lock) - 2):
locks[-i - 1].release()
val = c.val.run()
for _ in range(0, 60):
if val == i + 1:
break
val = c.val.run()
time.sleep(1)
assert val == i + 1

fail_flag.touch()
locks[1 - len(file_lock)].release()
# Fail the pipeline
with pytest.raises(Exception):
ray.get(final_ret)

fail_flag.unlink()
workflow.resume("counter")
# After resume, it'll start form the place where it failed
for i in range(len(file_lock) - 1, len(file_lock)):
locks[-i - 1].release()
val = c.val.run()
for _ in range(0, 60):
if val == i + 1:
break
val = c.val.run()
time.sleep(1)
assert val == i + 1

assert c.val.run() == 5


@pytest.mark.parametrize(
"workflow_start_regular",
[
Expand Down
85 changes: 84 additions & 1 deletion python/ray/workflow/tests/test_virtual_actor_3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import pytest
from filelock import FileLock
import time
from typing import Optional, Dict, Tuple, List

import ray
from ray import workflow
from typing import Optional, Dict, Tuple, List
from ray.tests.conftest import * # noqa


@ray.workflow.virtual_actor
Expand Down Expand Up @@ -123,6 +126,86 @@ def test_writer_actor_pressure_test(workflow_start_regular):
assert user.goods_value.run() == (5, [])


@pytest.mark.parametrize(
"workflow_start_regular",
[
{
"num_cpus": 4
# We need more CPUs, otherwise 'create()' blocks 'get()'
}
],
indirect=True,
)
@pytest.mark.repeat(5)
def test_wf_in_actor_chain(workflow_start_regular, tmp_path):
file_lock = [str(tmp_path / str(i)) for i in range(5)]
fail_flag = tmp_path / "fail"

@workflow.virtual_actor
class Counter:
def __init__(self):
self._counter = 0

def incr(self, n):
with FileLock(file_lock[n]):
self._counter += 1
if fail_flag.exists():
raise Exception()

if n == 0:
return self._counter
else:
return self.incr.step(n - 1)

@workflow.virtual_actor.readonly
def val(self):
return self._counter

def __getstate__(self):
return self._counter

def __setstate__(self, v):
self._counter = v

locks = [FileLock(f) for f in file_lock]
for lock in locks:
lock.acquire()

c = Counter.get_or_create("counter")
ray.get(c.ready())
final_ret = c.incr.run_async(len(file_lock) - 1)
for i in range(0, len(file_lock) - 2):
locks[-i - 1].release()
val = c.val.run()
for _ in range(0, 60):
if val == i + 1:
break
val = c.val.run()
time.sleep(1)
assert val == i + 1

fail_flag.touch()
locks[1 - len(file_lock)].release()
# Fail the pipeline
with pytest.raises(Exception):
ray.get(final_ret)

fail_flag.unlink()
workflow.resume("counter")
# After resume, it'll start form the place where it failed
for i in range(len(file_lock) - 1, len(file_lock)):
locks[-i - 1].release()
val = c.val.run()
for _ in range(0, 60):
if val == i + 1:
break
val = c.val.run()
time.sleep(1)
assert val == i + 1

assert c.val.run() == 5


if __name__ == "__main__":
import sys

Expand Down
Loading

0 comments on commit 03a5589

Please sign in to comment.