-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
I'm using distributed in an asyncio application on Linux.
I just lost 3 days debugging a deadlock when running pytest; the issue completely vanished when I tried running the application by hand with a remote scheduler, or even when slightly changing unrelated things. To make things worse, there was apparently no way to make the logging module - or print() to stdout for the matter - to work inside the dask workers, but again it worked fine when running everything outside of pytest.
My tests:
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def dask_client(event_loop):
async with distributed.LocalCluster(
n_workers=4,
threads_per_worker=1,
host="localhost:0",
dashboard_address=None,
asynchronous=True,
) as cluster:
async with distributed.Client(
cluster, asynchronous=True, set_as_default=False
) as client:
yield client
@pytest.mark.asyncio
async def test1(client):
assert await client.submit(lambda: 123) == 123Sadly the POC above does not allow reproducing the deadlock; I never figured out exactly where the problem was due to said lack of output.
The root cause of the issue is that, on Python 3.8 Linux, LocalCluster creates new processes using the fork multiprocessing context. This causes a myriad of issues and, in the best case, makes the unit tests behave substantially differently than in production, because the forked processes will inherit all global variables of the parent process.
In my specific case:
- pytest tampers with the logging module, with stdout, and stderr
- pytest-asyncio tampers with the asyncio module
- my application has its own global variables, which are expected not to be initialised when first running a function on a dask worker
- among the global variables I have a motor MongoDB client and an asyncio_redis Redis client. Both internally have a link to the asyncio event loop as of when they were initialised. The asyncio event loop is not designed to survive a fork (known limitation); if you try running it in a forked process, weird things will happen.
To make things worse, Python 3.8 on MacOSX and Windows (used in many dev boxes) uses the spawn context by default, whereas on Linux (typically CI and production) it uses the fork context. This can cause problems that appear for the first time in CI or production which are very hard to reproduce in development.
Expected behaviour
LocalCluster should always create subprocesses using the 'spawn' method.
Workaround
Run dask-scheduler and dask-worker from the command line, or start LocalCluster from a virgin interpreter.
Changing my pytest fixture to the below fixed all my issues:
def run_cluster(init_lock, shutdown_lock):
cluster = distributed.LocalCluster(
n_workers=4,
threads_per_worker=1,
host="localhost:12345",
dashboard_address=None,
)
init_lock.release()
shutdown_lock.acquire()
cluster.close()
@pytest.fixture(scope="session")
async def dask_client(event_loop):
ctx = multiprocessing.get_context("spawn")
init_lock = ctx.Lock()
init_lock.acquire()
shutdown_lock = ctx.Lock()
shutdown_lock.acquire()
cluster = ctx.Process(target=run_cluster, args=(init_lock, shutdown_lock))
cluster.start()
init_lock.acquire()
async with distributed.Client(
"localhost:12345", asynchronous=True, set_as_default=False
) as client:
yield client
shutdown_lock.release()
cluster.join()