Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky distributed/dashboard/tests/test_scheduler_bokeh.py::test_simple #6843

Open
gjoseph92 opened this issue Aug 5, 2022 · 0 comments
Open
Labels
flaky test Intermittent failures on CI.

Comments

@gjoseph92
Copy link
Collaborator

_________________________________ test_simple __________________________________

args = (), kwds = {}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.8/contextlib.py:75: in inner
    return func(*args, **kwds)
distributed/utils_test.py:1074: in test_func
    return _run_and_close_tornado(async_fn_outer)
distributed/utils_test.py:376: in _run_and_close_tornado
    return asyncio.run(inner_fn())
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/runners.py:44: in run
    return loop.run_until_complete(main)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/base_events.py:616: in run_until_complete
    return future.result()
distributed/utils_test.py:373: in inner_fn
    return await async_fn(*args, **kwargs)
distributed/utils_test.py:1071: in async_fn_outer
    return await asyncio.wait_for(async_fn(), timeout=timeout * 2)
../../../miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    async def async_fn():
        result = None
        with dask.config.set(config):
            workers = []
            s = False
    
            for _ in range(60):
                try:
                    s, ws = await start_cluster(
                        nthreads,
                        scheduler,
                        security=security,
                        Worker=Worker,
                        scheduler_kwargs=scheduler_kwargs,
                        worker_kwargs=worker_kwargs,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to start gen_cluster: "
                        f"{e.__class__.__name__}: {e}; retrying",
                        exc_info=True,
                    )
                    await asyncio.sleep(1)
                else:
                    workers[:] = ws
                    args = [s] + workers
                    break
            if s is False:
                raise Exception("Could not start cluster")
            if client:
                c = await Client(
                    s.address,
                    security=security,
                    asynchronous=True,
                    **client_kwargs,
                )
                args = [c] + args
    
            try:
                coro = func(*args, *outer_args, **kwargs)
                task = asyncio.create_task(coro)
                coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
                result = await coro2
                validate_state(s, *workers)
    
            except asyncio.TimeoutError:
                assert task
                buffer = io.StringIO()
                # This stack indicates where the coro/test is suspended
                task.print_stack(file=buffer)
    
                if cluster_dump_directory:
                    await dump_cluster_state(
                        s,
                        ws,
                        output_dir=cluster_dump_directory,
                        func_name=func.__name__,
                    )
    
                task.cancel()
                while not task.cancelled():
                    await asyncio.sleep(0.01)
    
                # Hopefully, the hang has been caused by inconsistent
                # state, which should be much more meaningful than the
                # timeout
                validate_state(s, *workers)
    
                # Remove as much of the traceback as possible; it's
                # uninteresting boilerplate from utils_test and asyncio
                # and not from the code being tested.
>               raise asyncio.TimeoutError(
                    f"Test timeout after {timeout}s.\n"
                    "========== Test stack trace starts here ==========\n"
                    f"{buffer.getvalue()}"
                ) from None
E               asyncio.exceptions.TimeoutError: Test timeout after 30s.
E               ========== Test stack trace starts here ==========
E               Stack for <Task pending name='Task-[1937](https://github.com/dask/distributed/runs/7696512528?check_suite_focus=true#step:11:1938)4' coro=<test_simple() running at /Users/runner/work/distributed/distributed/distributed/dashboard/tests/test_scheduler_bokeh.py:75> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe615435820>()]>> (most recent call last):
E                 File "/Users/runner/work/distributed/distributed/distributed/dashboard/tests/test_scheduler_bokeh.py", line 75, in test_simple
E                   response = await http_client.fetch(

distributed/utils_test.py:1002: TimeoutError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_simple.yaml
----------------------------- Captured stderr call -----------------------------
2022-08-05 18:57:30,388 - distributed.utils_perf - WARNING - full garbage collections took 85% CPU time recently (threshold: 10%)
2022-08-05 18:57:30,568 - distributed.utils_perf - WARNING - full garbage collections took 97% CPU time recently (threshold: 10%)
2022-08-05 18:57:30,745 - distributed.utils_perf - WARNING - full garbage collections took 97% CPU time recently (threshold: 10%)
2022-08-05 18:57:31,274 - distributed.utils_perf - WARNING - full garbage collections took 93% CPU time recently (threshold: 10%)
2022-08-05 18:57:31,732 - distributed.utils_perf - WARNING - full garbage collections took 84% CPU time recently (threshold: 10%)
2022-08-05 18:57:32,727 - distributed.utils_perf - WARNING - full garbage collections took 71% CPU time recently (threshold: 10%)
2022-08-05 18:57:33,460 - distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
2022-08-05 18:57:33,808 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7fe69076d880>>, <Task finished name='Task-19538' coro=<Hardware.__init__.<locals>.f() done, defined at /Users/runner/work/distributed/distributed/distributed/dashboard/components/scheduler.py:672> exception=CommClosedError('in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:52705 remote=tcp://127.0.0.1:52647>: Stream is closed')>)
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 223, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/Users/runner/work/distributed/distributed/distributed/dashboard/components/scheduler.py", line 673, in f
    result = await self.scheduler.benchmark_hardware()
  File "/Users/runner/work/distributed/distributed/distributed/scheduler.py", line 6508, in benchmark_hardware
    responses = await asyncio.gather(*futures)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 1154, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 919, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 239, in read
    convert_stream_closed_error(self, e)
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.benchmark_network local=tcp://127.0.0.1:52705 remote=tcp://127.0.0.1:52647>: Stream is closed

https://github.com/dask/distributed/runs/7696512528?check_suite_focus=true#step:11:1962

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flaky test Intermittent failures on CI.
Projects
None yet
Development

No branches or pull requests

1 participant