Closed
Description
https://github.com/dask/distributed/runs/6606349069?check_suite_focus=true#step:11:1265
================================== FAILURES ===================================
________________________________ test_restart _________________________________
self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
deserializers = None
async def read(self, deserializers=None):
stream = self.stream
if stream is None:
raise CommClosedError()
fmt = "Q"
fmt_size = struct.calcsize(fmt)
try:
> frames_nbytes = await stream.read_bytes(fmt_size)
E tornado.iostream.StreamClosedError: Stream is closed
distributed\comm\tcp.py:226: StreamClosedError
The above exception was the direct cause of the following exception:
kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
async def send_recv_from_rpc(**kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = None
try:
comm = await self.live_comm()
comm.name = "rpc." + key
> result = await send_recv(comm=comm, op=key, **kwargs)
distributed\core.py:894:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
reply = True, serializers = None, deserializers = None
kwargs = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
msg = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
please_close = True, force_close = True
async def send_recv(
comm: Comm,
*,
reply: bool = True,
serializers=None,
deserializers=None,
**kwargs,
):
"""Send and recv with a Comm.
Keyword arguments turn into the message
response = await send_recv(comm, op='ping', reply=True)
"""
msg = kwargs
msg["reply"] = reply
please_close = kwargs.get("close", False)
force_close = False
if deserializers is None:
deserializers = serializers
if deserializers is not None:
msg["serializers"] = deserializers
try:
await comm.write(msg, serializers=serializers, on_error="raise")
if reply:
> response = await comm.read(deserializers=deserializers)
distributed\core.py:739:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
deserializers = None
async def read(self, deserializers=None):
stream = self.stream
if stream is None:
raise CommClosedError()
fmt = "Q"
fmt_size = struct.calcsize(fmt)
try:
frames_nbytes = await stream.read_bytes(fmt_size)
(frames_nbytes,) = struct.unpack(fmt, frames_nbytes)
frames = host_array(frames_nbytes)
for i, j in sliding_window(
2,
range(0, frames_nbytes + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
):
chunk = frames[i:j]
chunk_nbytes = len(chunk)
n = await stream.read_into(chunk)
assert n == chunk_nbytes, (n, chunk_nbytes)
except StreamClosedError as e:
self.stream = None
self._closed = True
if not sys.is_finalizing():
> convert_stream_closed_error(self, e)
distributed\comm\tcp.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
exc = StreamClosedError('Stream is closed')
def convert_stream_closed_error(obj, exc):
"""
Re-raise StreamClosedError as CommClosedError.
"""
if exc.real_error is not None:
# The stream was closed because of an underlying OS error
exc = exc.real_error
if ssl and isinstance(exc, ssl.SSLError):
if "UNKNOWN_CA" in exc.reason:
raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
else:
> raise CommClosedError(f"in {obj}: {exc}") from exc
E distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>: Stream is closed
distributed\comm\tcp.py:150: CommClosedError
The above exception was the direct cause of the following exception:
async def async_fn():
result = None
with tempfile.TemporaryDirectory() as tmpdir:
config2 = merge({"temporary-directory": tmpdir}, config)
with dask.config.set(config2):
workers = []
s = False
for _ in range(60):
try:
s, ws = await start_cluster(
nthreads,
scheduler,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=worker_kwargs,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster: "
f"{e.__class__.__name__}: {e}; retrying",
exc_info=True,
)
await asyncio.sleep(1)
else:
workers[:] = ws
args = [s] + workers
break
if s is False:
raise Exception("Could not start cluster")
if client:
c = await Client(
s.address,
security=security,
asynchronous=True,
**client_kwargs,
)
args = [c] + args
try:
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
> result = await coro2
distributed\utils_test.py:1111:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
timeout = 60
async def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout is None:
return await fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
> return fut.result()
C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:479:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:52172', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 1>, b = <Nanny: None, threads: 2>
@gen_cluster(client=True, Worker=Nanny, timeout=60)
async def test_restart(c, s, a, b):
futures = c.map(inc, range(20))
await wait(futures)
> await s.restart()
distributed\tests\test_scheduler.py:616:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (<Scheduler 'tcp://127.0.0.1:52172', workers: 0, cores: 0, tasks: 0>,)
kwargs = {}
async def wrapper(*args, **kwargs):
with self:
> return await func(*args, **kwargs)
distributed\utils.py:761:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Scheduler 'tcp://127.0.0.1:52172', workers: 0, cores: 0, tasks: 0>
client = None, timeout = 30
@log_errors
async def restart(self, client=None, timeout=30):
"""Restart all workers. Reset local state."""
stimulus_id = f"restart-{time()}"
n_workers = len(self.workers)
logger.info("Send lost future signal to clients")
for cs in self.clients.values():
self.client_releases_keys(
keys=[ts.key for ts in cs.wants_what],
client=cs.client_key,
stimulus_id=stimulus_id,
)
nannies = {addr: ws.nanny for addr, ws in self.workers.items()}
for addr in list(self.workers):
try:
# Ask the worker to close if it doesn't have a nanny,
# otherwise the nanny will kill it anyway
await self.remove_worker(
address=addr, close=addr not in nannies, stimulus_id=stimulus_id
)
except Exception:
logger.info(
"Exception while restarting. This is normal", exc_info=True
)
self.clear_task_state()
for plugin in list(self.plugins.values()):
try:
plugin.restart(self)
except Exception as e:
logger.exception(e)
logger.debug("Send kill signal to nannies: %s", nannies)
async with contextlib.AsyncExitStack() as stack:
nannies = [
await stack.enter_async_context(
rpc(nanny_address, connection_args=self.connection_args)
)
for nanny_address in nannies.values()
if nanny_address is not None
]
resps = All(
[nanny.restart(close=True, timeout=timeout * 0.8) for nanny in nannies]
)
try:
> resps = await asyncio.wait_for(resps, timeout)
distributed\scheduler.py:5098:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Task finished name='Task-62380' coro=<All() done, defined at d:\a\distributed\distributed\distributed\utils.py:201> exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
timeout = 30
async def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
if timeout is None:
return await fut
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
await _cancel_and_wait(fut, loop=loop)
try:
return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
await waiter
except exceptions.CancelledError:
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
> return fut.result()
C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:479:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = [<coroutine object rpc.__getattr__.<locals>.send_recv_from_rpc at 0x0000015BE54AE640>, <coroutine object rpc.__getattr__.<locals>.send_recv_from_rpc at 0x0000015BE54AED40>]
quiet_exceptions = ()
async def All(args, quiet_exceptions=()):
"""Wait on many tasks at the same time
Err once any of the tasks err.
See https://github.com/tornadoweb/tornado/issues/1546
Parameters
----------
args: futures to wait for
quiet_exceptions: tuple, Exception
Exception types to avoid logging if they fail
"""
tasks = gen.WaitIterator(*map(asyncio.ensure_future, args))
results = [None for _ in args]
while not tasks.done():
try:
> result = await tasks.next()
distributed\utils.py:218:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>
async def send_recv_from_rpc(**kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = None
try:
comm = await self.live_comm()
comm.name = "rpc." + key
result = await send_recv(comm=comm, op=key, **kwargs)
except (RPCClosed, CommClosedError) as e:
if comm:
> raise type(e)(
f"Exception while trying to call remote method {key!r} before comm was established."
) from e
E distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
distributed\core.py:897: CommClosedError
During handling of the above exception, another exception occurred:
path = 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space'
onerror = <function TemporaryDirectory._rmtree.<locals>.onerror at 0x0000015BE3E78C10>
def _rmtree_unsafe(path, onerror):
try:
with os.scandir(path) as scandir_it:
entries = list(scandir_it)
except OSError:
onerror(os.scandir, path, sys.exc_info())
entries = []
for entry in entries:
fullname = entry.path
if _rmtree_isdir(entry):
try:
if entry.is_symlink():
# This can only happen if someone replaces
# a directory with a symlink after the call to
# os.scandir or entry.is_dir above.
raise OSError("Cannot call rmtree on a symbolic link")
except OSError:
onerror(os.path.islink, fullname, sys.exc_info())
continue
_rmtree_unsafe(fullname, onerror)
else:
try:
> os.unlink(fullname)
E PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:625: PermissionError
During handling of the above exception, another exception occurred:
func = <built-in function unlink>
path = 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
exc_info = (<class 'PermissionError'>, PermissionError(13, 'The process cannot access the file because it is being used by another process'), <traceback object at 0x0000015BE27AFEC0>)
def onerror(func, path, exc_info):
if issubclass(exc_info[0], PermissionError):
def resetperms(path):
try:
_os.chflags(path, 0)
except AttributeError:
pass
_os.chmod(path, 0o700)
try:
if path != name:
resetperms(_os.path.dirname(path))
resetperms(path)
try:
> _os.unlink(path)
E PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:805: PermissionError
During handling of the above exception, another exception occurred:
args = (), kwds = {}
@wraps(func)
def inner(*args, **kwds):
with self._recreate_cm():
> return func(*args, **kwds)
C:\Miniconda3\envs\dask-distributed\lib\contextlib.py:79:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\utils_test.py:1214: in test_func
return _run_and_close_tornado(async_fn_outer)
distributed\utils_test.py:472: in _run_and_close_tornado
return asyncio.run(inner_fn())
C:\Miniconda3\envs\dask-distributed\lib\asyncio\runners.py:44: in run
return loop.run_until_complete(main)
C:\Miniconda3\envs\dask-distributed\lib\asyncio\base_events.py:647: in run_until_complete
return future.result()
distributed\utils_test.py:469: in inner_fn
return await async_fn(*args, **kwargs)
distributed\utils_test.py:1211: in async_fn_outer
return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py:479: in wait_for
return fut.result()
distributed\utils_test.py:1206: in async_fn
return result
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:830: in __exit__
self.cleanup()
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:834: in cleanup
self._rmtree(self.name)
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:816: in _rmtree
_shutil.rmtree(name, onerror=onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:757: in rmtree
return _rmtree_unsafe(path, onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:622: in _rmtree_unsafe
_rmtree_unsafe(fullname, onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:627: in _rmtree_unsafe
onerror(os.unlink, fullname, sys.exc_info())
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:808: in onerror
cls._rmtree(path)
C:\Miniconda3\envs\dask-distributed\lib\tempfile.py:816: in _rmtree
_shutil.rmtree(name, onerror=onerror)
2022-05-26 08:50:34,125 - distributed.utils_perf - WARNING - full garbage collections took 33% CPU time recently (threshold: 10%)
2022-05-26 08:50:35,318 - distributed.utils_perf - WARNING - full garbage collections took 31% CPU time recently (threshold: 10%)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:757: in rmtree
2022-05-26 08:50:36,588 - distributed.utils_perf - WARNING - full garbage collections took 31% CPU time recently (threshold: 10%)
return _rmtree_unsafe(path, onerror)
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:608: in _rmtree_unsafe
onerror(os.scandir, path, sys.exc_info())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
path = 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
onerror = <function TemporaryDirectory._rmtree.<locals>.onerror at 0x0000015BE576C040>
def _rmtree_unsafe(path, onerror):
try:
> with os.scandir(path) as scandir_it:
E NotADirectoryError: [WinError 267] The directory name is invalid: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
C:\Miniconda3\envs\dask-distributed\lib\shutil.py:605: NotADirectoryError
---------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump\test_restart.yaml
---------------------------- Captured stderr call -----------------------------
2022-05-26 08:43:35,072 - distributed.scheduler - INFO - State start
2022-05-26 08:43:35,085 - distributed.scheduler - INFO - Clear task state
2022-05-26 08:43:35,086 - distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:52172
2022-05-26 08:43:35,086 - distributed.scheduler - INFO - dashboard at: 127.0.0.1:52171
2022-05-26 08:43:35,116 - distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:52174'
2022-05-26 08:43:35,117 - distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:52173'
2022-05-26 08:43:44,624 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:52183
2022-05-26 08:43:44,624 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:52183
2022-05-26 08:43:44,624 - distributed.worker - INFO - dashboard at: 127.0.0.1:52184
2022-05-26 08:43:44,624 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:52172
2022-05-26 08:43:44,625 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:44,625 - distributed.worker - INFO - Threads: 1
2022-05-26 08:43:44,625 - distributed.worker - INFO - Memory: 7.00 GiB
2022-05-26 08:43:44,625 - distributed.worker - INFO - Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-jfalx0k4
2022-05-26 08:43:44,625 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:45,652 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:52183', name: 0, status: init, memory: 0, processing: 0>
2022-05-26 08:43:45,653 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52183
2022-05-26 08:43:45,653 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:45,654 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:52172
2022-05-26 08:43:45,655 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:45,655 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:45,695 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:52191
2022-05-26 08:43:45,696 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:52191
2022-05-26 08:43:45,696 - distributed.worker - INFO - dashboard at: 127.0.0.1:52192
2022-05-26 08:43:45,696 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:52172
2022-05-26 08:43:45,696 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:45,696 - distributed.worker - INFO - Threads: 2
2022-05-26 08:43:45,696 - distributed.worker - INFO - Memory: 7.00 GiB
2022-05-26 08:43:45,696 - distributed.worker - INFO - Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-uayyvx5n
2022-05-26 08:43:45,696 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:46,815 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:52191', name: 1, status: init, memory: 0, processing: 0>
2022-05-26 08:43:46,816 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:52191
2022-05-26 08:43:46,816 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:46,817 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:52172
2022-05-26 08:43:46,817 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:46,818 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:46,850 - distributed.scheduler - INFO - Receive client connection: Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:46,852 - distributed.core - INFO - Starting established connection
2022-05-26 08:43:48,166 - distributed.scheduler - INFO - Send lost future signal to clients
2022-05-26 08:43:48,167 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:52183', name: 0, status: running, memory: 0, processing: 0>
2022-05-26 08:43:48,168 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:52183
2022-05-26 08:43:48,168 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:52191', name: 1, status: running, memory: 0, processing: 0>
2022-05-26 08:43:48,168 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:52191
2022-05-26 08:43:48,168 - distributed.scheduler - INFO - Lost all workers
2022-05-26 08:43:48,169 - distributed.scheduler - INFO - Clear task state
2022-05-26 08:43:48,182 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:52191'.
2022-05-26 08:43:48,183 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:52191'. Shutting down.
2022-05-26 08:43:48,183 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52191
2022-05-26 08:43:48,189 - distributed.nanny - INFO - Nanny asking worker to close
2022-05-26 08:43:48,197 - distributed.nanny - INFO - Nanny asking worker to close
2022-05-26 08:43:48,200 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52183
2022-05-26 08:43:48,204 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. Status: Status.closing
2022-05-26 08:43:48,211 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. Status: Status.closing
2022-05-26 08:43:48,215 - distributed.nanny - INFO - Worker closed
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\multiprocessing\queues.py", line 247, in _feed
send_bytes(obj)
File "C:\Miniconda3\envs\dask-distributed\lib\multiprocessing\connection.py", line 205, in send_bytes
self._send_bytes(m[offset:offset + size])
File "C:\Miniconda3\envs\dask-distributed\lib\multiprocessing\connection.py", line 285, in _send_bytes
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed
2022-05-26 08:43:49,178 - distributed.nanny - WARNING - Restarting worker
2022-05-26 08:43:49,183 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:52174'.
2022-05-26 08:43:52,524 - distributed.scheduler - INFO - Remove client Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:52,525 - distributed.scheduler - INFO - Remove client Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:52,525 - distributed.scheduler - INFO - Close client connection: Client-f4ffd945-dccf-11ec-8608-000d3a345d56
2022-05-26 08:43:52,527 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:52173'.
2022-05-26 08:43:52,528 - distributed.nanny - INFO - Nanny asking worker to close
2022-05-26 08:43:53,775 - tornado.application - ERROR - Exception in callback functools.partial(<built-in method set_result of _asyncio.Future object at 0x0000015BE55FF040>, None)
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
asyncio.exceptions.InvalidStateError: invalid state
2022-05-26 08:43:55,702 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:52212
2022-05-26 08:43:55,703 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:52212
2022-05-26 08:43:55,703 - distributed.worker - INFO - dashboard at: 127.0.0.1:52213
2022-05-26 08:43:55,703 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:52172
2022-05-26 08:43:55,704 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:55,704 - distributed.worker - INFO - Threads: 1
2022-05-26 08:43:55,704 - distributed.worker - INFO - Memory: 7.00 GiB
2022-05-26 08:43:55,704 - distributed.worker - INFO - Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-55fyl_sg
2022-05-26 08:43:55,704 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:55,707 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:52212
2022-05-26 08:43:55,707 - distributed.worker - INFO - Closed worker has not yet started: Status.init
2022-05-26 08:43:55,926 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:52219
2022-05-26 08:43:55,927 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:52219
2022-05-26 08:43:55,927 - distributed.worker - INFO - dashboard at: 127.0.0.1:52220
2022-05-26 08:43:55,927 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:52172
2022-05-26 08:43:55,927 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:55,927 - distributed.worker - INFO - Threads: 2
2022-05-26 08:43:55,927 - distributed.worker - INFO - Memory: 7.00 GiB
2022-05-26 08:43:55,927 - distributed.worker - INFO - Local Directory: C:\Users\RUNNER~1\AppData\Local\Temp\tmpy7ypw1sz\dask-worker-space\worker-c_b9ylk8
2022-05-26 08:43:55,928 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:43:55,933 - distributed.scheduler - INFO - Scheduler closing...
2022-05-26 08:43:55,934 - distributed.scheduler - INFO - Scheduler closing all comms
------------------------------ Captured log call ------------------------------
ERROR asyncio:base_events.py:1753 Future exception was never retrieved
future: <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\core.py", line 894, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "d:\a\distributed\distributed\distributed\core.py", line 739, in send_recv
response = await comm.read(deserializers=deserializers)
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:52203 remote=tcp://127.0.0.1:52173>: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\gen.py", line 769, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "d:\a\distributed\distributed\distributed\utils.py", line 231, in quiet
yield task
File "C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\gen.py", line 762, in run
value = future.result()
File "d:\a\distributed\distributed\distributed\core.py", line 897, in send_recv_from_rpc
raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
ERROR asyncio.events:utils.py:787
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\core.py", line 894, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "d:\a\distributed\distributed\distributed\core.py", line 739, in send_recv
response = await comm.read(deserializers=deserializers)
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "d:\a\distributed\distributed\distributed\comm\tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:52205 remote=tcp://127.0.0.1:52174>: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\utils_test.py", line 1111, in async_fn
result = await coro2
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
return fut.result()
File "D:\a\distributed\distributed\distributed\tests\test_scheduler.py", line 616, in test_restart
await s.restart()
File "d:\a\distributed\distributed\distributed\utils.py", line 761, in wrapper
return await func(*args, **kwargs)
File "d:\a\distributed\distributed\distributed\scheduler.py", line 5098, in restart
resps = await asyncio.wait_for(resps, timeout)
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
return fut.result()
File "d:\a\distributed\distributed\distributed\utils.py", line 218, in All
result = await tasks.next()
File "d:\a\distributed\distributed\distributed\core.py", line 897, in send_recv_from_rpc
raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 625, in _rmtree_unsafe
os.unlink(fullname)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 805, in onerror
_os.unlink(path)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\base_events.py", line 647, in run_until_complete
return future.result()
File "d:\a\distributed\distributed\distributed\utils_test.py", line 469, in inner_fn
return await async_fn(*args, **kwargs)
File "d:\a\distributed\distributed\distributed\utils_test.py", line 1211, in async_fn_outer
return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 479, in wait_for
return fut.result()
File "d:\a\distributed\distributed\distributed\utils_test.py", line 1206, in async_fn
return result
File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 830, in __exit__
self.cleanup()
File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 834, in cleanup
self._rmtree(self.name)
File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 816, in _rmtree
_shutil.rmtree(name, onerror=onerror)
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 757, in rmtree
return _rmtree_unsafe(path, onerror)
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 622, in _rmtree_unsafe
_rmtree_unsafe(fullname, onerror)
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 627, in _rmtree_unsafe
onerror(os.unlink, fullname, sys.exc_info())
File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 808, in onerror
cls._rmtree(path)
File "C:\Miniconda3\envs\dask-distributed\lib\tempfile.py", line 816, in _rmtree
_shutil.rmtree(name, onerror=onerror)
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 757, in rmtree
return _rmtree_unsafe(path, onerror)
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 608, in _rmtree_unsafe
onerror(os.scandir, path, sys.exc_info())
File "C:\Miniconda3\envs\dask-distributed\lib\shutil.py", line 605, in _rmtree_unsafe
with os.scandir(path) as scandir_it:
NotADirectoryError: [WinError 267] The directory name is invalid: 'C:\\Users\\RUNNER~1\\AppData\\Local\\Temp\\tmpy7ypw1sz\\dask-worker-space\\worker-c_b9ylk8.dirlock'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\nanny.py", line 632, in start
await self.running.wait()
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\locks.py", line 226, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "d:\a\distributed\distributed\distributed\utils.py", line 761, in wrapper
return await func(*args, **kwargs)
File "d:\a\distributed\distributed\distributed\nanny.py", line 520, in _on_exit
await self.instantiate()
File "d:\a\distributed\distributed\distributed\nanny.py", line 410, in instantiate
result = await asyncio.wait_for(
File "C:\Miniconda3\envs\dask-distributed\lib\asyncio\tasks.py", line 469, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
----- generated xml file: D:\a\distributed\distributed\reports\pytest.xml -----