Skip to content

Commit

Permalink
Update cythonization in CI (dask#4764)
Browse files Browse the repository at this point in the history
* Check when scheduler is cythonized in CI

* Update cythonize step

* Update test

* Run full test suite again

* Add global variable to Scheduler on compiled state (#2)

* Add `COMPILED` global variable to scheduler

This should make it easy to tell whether the scheduler was compiled with
Cython or not.

* Simplify Cythonized Scheduler check

* lint

* Add skips to distributed/cli/tests/test_dask_spec.py

* xfail tests

* Move cythonized assert into test setup

* Remove unused CYTHONIZED env var

Co-authored-by: jakirkham <jakirkham@gmail.com>
  • Loading branch information
jrbourbeau and jakirkham authored Apr 29, 2021
1 parent 6f7ca0a commit 56aed44
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 7 deletions.
15 changes: 9 additions & 6 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ jobs:
if: ${{ matrix.os == 'windows-latest' && matrix.python-version == '3.9' }}
run: mamba uninstall ipython

- name: Cythonize
shell: bash -l {0}
if: ${{ matrix.python-version == '3.7' }}
run: python setup.py build_ext --with-cython

- name: Install
shell: bash -l {0}
run: python -m pip install --no-deps -e .
run: |
# Cythonize scheduler on Python 3.7 builds
if [[ "${{ matrix.python-version }}" = "3.7" ]]; then
python -m pip install -vv --no-deps --install-option="--with-cython" -e .
python -c "from distributed.scheduler import COMPILED; assert COMPILED"
else
python -m pip install --no-deps -e .
python -c "from distributed.scheduler import COMPILED; assert not COMPILED"
fi
- name: mamba list
shell: bash -l {0}
Expand Down
3 changes: 3 additions & 0 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import yaml

from distributed import Client
from distributed.scheduler import COMPILED
from distributed.utils_test import cleanup # noqa: F401
from distributed.utils_test import popen


@pytest.mark.skipif(COMPILED, reason="Fails with cythonized scheduler")
@pytest.mark.asyncio
async def test_text(cleanup):
with popen(
Expand Down Expand Up @@ -37,6 +39,7 @@ async def test_text(cleanup):
assert w["nthreads"] == 3


@pytest.mark.skipif(COMPILED, reason="Fails with cythonized scheduler")
@pytest.mark.asyncio
async def test_file(cleanup, tmp_path):
fn = str(tmp_path / "foo.yaml")
Expand Down
2 changes: 2 additions & 0 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from distributed.deploy.local import LocalCluster
from distributed.deploy.utils_test import ClusterTest
from distributed.metrics import time
from distributed.scheduler import COMPILED
from distributed.system import MEMORY_LIMIT
from distributed.utils import TimeoutError, sync
from distributed.utils_test import ( # noqa: F401
Expand Down Expand Up @@ -793,6 +794,7 @@ def scale_down(self, *args, **kwargs):
await cluster.close()


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
def test_local_tls_restart(loop):
from distributed.utils_test import tls_only_security

Expand Down
2 changes: 2 additions & 0 deletions distributed/diagnostics/tests/test_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
SchedulerPlugin,
)
from distributed.metrics import time
from distributed.scheduler import COMPILED
from distributed.utils_test import dec, div, gen_cluster, inc, nodebug


Expand Down Expand Up @@ -94,6 +95,7 @@ def check_bar_completed(capsys, width=40):
assert percent == "100% Completed"


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(client=True, Worker=Nanny, timeout=None)
async def test_AllProgress(c, s, a, b):
x, y, z = c.map(inc, [1, 2, 3])
Expand Down
2 changes: 2 additions & 0 deletions distributed/diagnostics/tests/test_progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from distributed.client import wait
from distributed.diagnostics.progress_stream import progress_quads, progress_stream
from distributed.scheduler import COMPILED
from distributed.utils_test import div, gen_cluster, inc


Expand Down Expand Up @@ -56,6 +57,7 @@ def test_progress_quads_too_many():
assert len(d["name"]) == 6 * 3


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(client=True)
async def test_progress_stream(c, s, a, b):
futures = c.map(div, [1] * 10, range(10))
Expand Down
2 changes: 2 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ def nogil(func):
set, {"released", "waiting", "no-worker", "processing", "erred", "memory"}
)
globals()["ALL_TASK_STATES"] = ALL_TASK_STATES
COMPILED = declare(bint, compiled)
globals()["COMPILED"] = COMPILED


@final
Expand Down
9 changes: 8 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@
from distributed.compatibility import MACOS, WINDOWS
from distributed.core import Status
from distributed.metrics import time
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
from distributed.scheduler import (
COMPILED,
CollectTaskMetaDataPlugin,
KilledWorker,
Scheduler,
)
from distributed.sizeof import sizeof
from distributed.utils import is_valid_xml, mp_context, sync, tmp_text, tmpfile
from distributed.utils_test import ( # noqa: F401
Expand Down Expand Up @@ -5096,6 +5101,7 @@ def test_dynamic_workloads_sync_random(c):
_test_dynamic_workloads_sync(c, delay="random")


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(client=True)
async def test_bytes_keys(c, s, a, b):
key = b"inc-123"
Expand Down Expand Up @@ -5848,6 +5854,7 @@ async def test_get_mix_futures_and_SubgraphCallable_dask_dataframe(c, s, a, b):
assert result.equals(df.astype("f8"))


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
def test_direct_to_workers(s, loop):
with Client(s["address"], loop=loop, direct_to_workers=True) as client:
future = client.scatter(1)
Expand Down
12 changes: 12 additions & 0 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from distributed.comm import CommClosedError
from distributed.compatibility import MACOS
from distributed.metrics import time
from distributed.scheduler import COMPILED
from distributed.utils import CancelledError, sync
from distributed.utils_test import loop # noqa: F401
from distributed.utils_test import (
Expand Down Expand Up @@ -99,6 +100,7 @@ async def test_gather_then_submit_after_failed_workers(c, s, w, x, y, z):
assert result == [sum(map(inc, range(20)))]


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(Worker=Nanny, timeout=60, client=True)
async def test_failed_worker_without_warning(c, s, a, b):
L = c.map(inc, range(10))
Expand Down Expand Up @@ -135,6 +137,7 @@ async def test_failed_worker_without_warning(c, s, a, b):
assert not (set(nthreads2) & set(s.nthreads)) # no overlap


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(Worker=Nanny, client=True, timeout=60)
async def test_restart(c, s, a, b):
assert s.nthreads == {a.worker_address: 1, b.worker_address: 2}
Expand Down Expand Up @@ -163,6 +166,7 @@ async def test_restart(c, s, a, b):
assert not any(cs.wants_what for cs in s.clients.values())


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(Worker=Nanny, client=True, timeout=60)
async def test_restart_cleared(c, s, a, b):
x = 2 * delayed(1) + 1
Expand All @@ -175,6 +179,7 @@ async def test_restart_cleared(c, s, a, b):
assert not coll


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
def test_restart_sync_no_center(loop):
with cluster(nanny=True) as (s, [a, b]):
with Client(s["address"], loop=loop) as c:
Expand All @@ -186,6 +191,7 @@ def test_restart_sync_no_center(loop):
assert len(c.nthreads()) == 2


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
def test_restart_sync(loop):
with cluster(nanny=True) as (s, [a, b]):
with Client(s["address"], loop=loop) as c:
Expand All @@ -205,6 +211,7 @@ def test_restart_sync(loop):
assert y.result() == 1 / 3


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(Worker=Nanny, client=True, timeout=60)
async def test_restart_fast(c, s, a, b):
L = c.map(sleep, range(10))
Expand All @@ -221,6 +228,7 @@ async def test_restart_fast(c, s, a, b):
assert result == 2


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
def test_worker_doesnt_await_task_completion(loop):
with cluster(nanny=True, nworkers=1) as (s, [w]):
with Client(s["address"], loop=loop) as c:
Expand All @@ -232,6 +240,7 @@ def test_worker_doesnt_await_task_completion(loop):
assert stop - start < 5


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
def test_restart_fast_sync(loop):
with cluster(nanny=True) as (s, [a, b]):
with Client(s["address"], loop=loop) as c:
Expand All @@ -248,6 +257,7 @@ def test_restart_fast_sync(loop):
assert x.result() == 2


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(Worker=Nanny, client=True, timeout=60)
async def test_fast_kill(c, s, a, b):
L = c.map(sleep, range(10))
Expand All @@ -263,6 +273,7 @@ async def test_fast_kill(c, s, a, b):
assert result == 2


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(Worker=Nanny, timeout=60)
async def test_multiple_clients_restart(s, a, b):
c1 = await Client(s.address, asynchronous=True)
Expand Down Expand Up @@ -356,6 +367,7 @@ async def test_broken_worker_during_computation(c, s, a, b):
await n.close()


@pytest.mark.xfail(COMPILED, reason="Fails with cythonized scheduler")
@gen_cluster(client=True, Worker=Nanny, timeout=60)
async def test_restart_during_computation(c, s, a, b):
xs = [delayed(slowinc)(i, delay=0.01) for i in range(50)]
Expand Down

0 comments on commit 56aed44

Please sign in to comment.