Skip to content

Flaky distributed/dashboard/tests/test_scheduler_bokeh.py::test_simple #6843

Description

@gjoseph92
_________________________________ test_simple __________________________________

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: in inner
    return func(*args, **kwds)
distributed/utils_test.py:1074: in test_func
    return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:376: in _run_and_close_tornado
    return asyncio.run(inner_fn())
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/runners.py:44: in run
    return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py:616: in run_until_complete
    return future.result()
distributed/utils_test.py:373: in inner_fn
    return await async_fn(*args, **kwargs)
distributed/utils_test.py:1071: in async_fn_outer
    return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    async def async_fn():
        result = None
        with dask.config.set(config):
            workers = []
            s = False
    
            for _ in range(60):
                try:
                    s, ws = await start_cluster(
                        nthreads,
                        scheduler,
                        security=security,
                        Worker=Worker,
                        scheduler_kwargs=scheduler_kwargs,
                        worker_kwargs=worker_kwargs,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to start gen_cluster: "
                        f"{e.__class__.__name__}: {e}; retrying",
                        exc_info=True,
                    )
                    await asyncio.sleep(1)
                else:
                    workers[:] = ws
                    args = [s] + workers
                    break
            if s is False:
                raise Exception("Could not start cluster")
            if client:
                c = await Client(
                    s.address,
                    security=security,
                    asynchronous=True,
                    **client_kwargs,
                )
                args = [c] + args
    
            try:
                coro = func(*args, *outer_args, **kwargs)
                task = asyncio.create_task(coro)
                coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                result = await coro2
                validate_state(s, *workers)
    
            except asyncio.TimeoutError:
                assert task
                buffer = io.StringIO()
                # This stack indicates where the coro/test is suspended
                task.print_stack(file=buffer)
    
                if cluster_dump_directory:
                    await dump_cluster_state(
                        s,
                        ws,
                        output_dir=cluster_dump_directory,
                        func_name=func.__name__,
                    )
    
                task.cancel()
                while not task.cancelled():
                    await asyncio.sleep(0.01)
    
                # Hopefully, the hang has been caused by inconsistent
                # state, which should be much more meaningful than the
                # timeout
                validate_state(s, *workers)
    
                # Remove as much of the traceback as possible; it's
                # uninteresting boilerplate from utils_test and asyncio
                # and not from the code being tested.
>               raise asyncio.TimeoutError(
                    f"Test timeout after {timeout}s.\n"
                    "========== Test stack trace starts here ==========\n"
                    f"{buffer.getvalue()}"
                ) from None
E               asyncio.exceptions.TimeoutError: Test timeout after 30s.
E               ========== Test stack trace starts here ==========
E               Stack for <Task pending name='Task-[1937](https://github.com/dask/distributed/runs/7696512528?check_suite_focus=true#step:11:1938)4' coro=<test_simple() running at /Users/runner/work/distributed/distributed/distributed/dashboard/tests/test_scheduler_bokeh.py:75> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe615435820>()]>> (most recent call last):
E                 File "/Users/runner/work/distributed/distributed/distributed/dashboard/tests/test_scheduler_bokeh.py", line 75, in test_simple
E                   response = await http_client.fetch(

distributed/utils_test.py:1002: TimeoutError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_simple.yaml
----------------------------- Captured stderr call -----------------------------
2022-08-05 18:57:30,388 - distributed.utils_perf - WARNING - full garbage collections took 85% CPU time recently (threshold: 10%)
2022-08-05 18:57:30,568 - distributed.utils_perf - WARNING - full garbage collections took 97% CPU time recently (threshold: 10%)
2022-08-05 18:57:30,745 - distributed.utils_perf - WARNING - full garbage collections took 97% CPU time recently (threshold: 10%)
2022-08-05 18:57:31,274 - distributed.utils_perf - WARNING - full garbage collections took 93% CPU time recently (threshold: 10%)
2022-08-05 18:57:31,732 - distributed.utils_perf - WARNING - full garbage collections took 84% CPU time recently (threshold: 10%)
2022-08-05 18:57:32,727 - distributed.utils_perf - WARNING - full garbage collections took 71% CPU time recently (threshold: 10%)
2022-08-05 18:57:33,460 - distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
2022-08-05 18:57:33,808 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fe69076d880>>, <Task finished name='Task-19538' coro=<Hardware.__init__.<locals>.f() done, defined at /Users/runner/work/distributed/distributed/distributed/dashboard/components/scheduler.py:672> exception=CommClosedError('in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:52705 remote=tcp://127.0.0.1:52647>: Stream is closed')>)
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/Users/runner/work/distributed/distributed/distributed/dashboard/components/scheduler.py", line 673, in f
    result = await self.scheduler.benchmark_hardware()
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 6508, in benchmark_hardware
    responses = await asyncio.gather(*futures)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 1154, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 919, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:52705 remote=tcp://127.0.0.1:52647>: Stream is closed

https://github.com/dask/distributed/runs/7696512528?check_suite_focus=true#step:11:1962

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions