Skip to content

Commit

Permalink
fix regressions
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 24, 2021
1 parent a48f68b commit fca1f7a
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 38 deletions.
19 changes: 9 additions & 10 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,22 @@ def test_scheduler_port_zero(loop):
with tmpfile() as fn:
with popen(
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"]
) as sched:
):
with Client(scheduler_file=fn, loop=loop) as c:
assert c.scheduler.port
assert c.scheduler.port != 8786


def test_dashboard_port_zero(loop):
pytest.importorskip("bokeh")
with tmpfile() as fn:
with popen(["dask-scheduler", "--dashboard-address", ":0"]) as proc:
count = 0
while count < 1:
line = proc.stderr.readline()
if b"dashboard" in line.lower():
sleep(0.01)
count += 1
assert b":0" not in line
with popen(["dask-scheduler", "--dashboard-address", ":0"]) as proc:
count = 0
while count < 1:
line = proc.stderr.readline()
if b"dashboard" in line.lower():
sleep(0.01)
count += 1
assert b":0" not in line


PRELOAD_TEXT = """
Expand Down
2 changes: 2 additions & 0 deletions distributed/deploy/utils_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from typing import Any

import pytest
Expand Down
4 changes: 2 additions & 2 deletions distributed/preloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ def process_preloads(
if isinstance(preload, str):
preload = [preload]
if preload_argv and isinstance(preload_argv[0], str):
preload_argv = [cast(list[str], preload_argv)] * len(preload)
preload_argv = [cast("list[str]", preload_argv)] * len(preload)
elif not preload_argv:
preload_argv = [cast(list[str], [])] * len(preload)
preload_argv = [cast("list[str]", [])] * len(preload)
if len(preload) != len(preload_argv):
raise ValueError(
"preload and preload_argv have mismatched lenghts "
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5555,7 +5555,7 @@ async def register_scheduler_plugin(self, comm=None, plugin=None, name=None):
if hasattr(plugin, "start"):
result = plugin.start(self)
if inspect.isawaitable(result):
result = await result
await result

self.add_plugin(plugin=plugin, name=name)

Expand Down
60 changes: 35 additions & 25 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import os
import pkgutil
import re
import resource
import shutil
import socket
import sys
Expand All @@ -31,6 +30,12 @@

import click
import tblib.pickling_support

try:
import resource
except ImportError:
resource = None # type: ignore

import tlz as toolz
from tornado import gen
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -111,8 +116,15 @@ def has_arg(func, argname):


def get_fileno_limit():
"""Get the maximum number of open files per process"""
return resource.getrlimit(resource.RLIMIT_NOFILE)[0]
"""
Get the maximum number of open files per process.
"""
if resource is not None:
return resource.getrlimit(resource.RLIMIT_NOFILE)[0]
else:
# Default ceiling for Windows when using the CRT, though it
# is settable using _setmaxstdio().
return 512


@toolz.memoize
Expand Down Expand Up @@ -1037,28 +1049,6 @@ def reset_logger_locks():
traitlets.config.Application.instance(), NotebookApp
)

# TODO: Use tornado's AnyThreadEventLoopPolicy, instead of class below,
# once tornado > 6.0.3 is available.

if WINDOWS:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
BaseEventLoopPolicy = asyncio.WindowsSelectorEventLoopPolicy # type: ignore
else:
BaseEventLoopPolicy = asyncio.DefaultEventLoopPolicy


class AnyThreadEventLoopPolicy(BaseEventLoopPolicy): # type: ignore
def get_event_loop(self):
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop


if not is_server_extension:
is_kernel_and_no_running_loop = False

Expand All @@ -1069,6 +1059,26 @@ def get_event_loop(self):
is_kernel_and_no_running_loop = True

if not is_kernel_and_no_running_loop:

# TODO: Use tornado's AnyThreadEventLoopPolicy, instead of class below,
# once tornado > 6.0.3 is available.
if WINDOWS:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
BaseEventLoopPolicy = asyncio.WindowsSelectorEventLoopPolicy # type: ignore
else:
BaseEventLoopPolicy = asyncio.DefaultEventLoopPolicy

class AnyThreadEventLoopPolicy(BaseEventLoopPolicy): # type: ignore
def get_event_loop(self):
try:
return super().get_event_loop()
except (RuntimeError, AssertionError):
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop

asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())


Expand Down
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
#
# Dask.distributed documentation build configuration file, created by
# sphinx-quickstart on Tue Oct 6 14:42:44 2015.
Expand Down

0 comments on commit fca1f7a

Please sign in to comment.