Skip to content

LocalCluster should run in a clean interpreter #3555

@crusaderky

Description

@crusaderky

I'm using distributed in an asyncio application on Linux.
I just lost 3 days debugging a deadlock when running pytest; the issue completely vanished when I tried running the application by hand with a remote scheduler, or even when slightly changing unrelated things. To make things worse, there was apparently no way to make the logging module - or print() to stdout for the matter - to work inside the dask workers, but again it worked fine when running everything outside of pytest.

My tests:

@pytest.fixture(scope="session")
def event_loop():
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest.fixture(scope="session")
async def dask_client(event_loop):
    async with distributed.LocalCluster(
        n_workers=4,
        threads_per_worker=1,
        host="localhost:0",
        dashboard_address=None,
        asynchronous=True,
    ) as cluster:
        async with distributed.Client(
            cluster, asynchronous=True, set_as_default=False
        ) as client:
            yield client

@pytest.mark.asyncio
async def test1(client):
    assert await client.submit(lambda: 123) == 123

Sadly the POC above does not allow reproducing the deadlock; I never figured out exactly where the problem was due to said lack of output.

The root cause of the issue is that, on Python 3.8 Linux, LocalCluster creates new processes using the fork multiprocessing context. This causes a myriad of issues and, in the best case, makes the unit tests behave substantially differently than in production, because the forked processes will inherit all global variables of the parent process.

In my specific case:

  • pytest tampers with the logging module, with stdout, and stderr
  • pytest-asyncio tampers with the asyncio module
  • my application has its own global variables, which are expected not to be initialised when first running a function on a dask worker
  • among the global variables I have a motor MongoDB client and an asyncio_redis Redis client. Both internally have a link to the asyncio event loop as of when they were initialised. The asyncio event loop is not designed to survive a fork (known limitation); if you try running it in a forked process, weird things will happen.

To make things worse, Python 3.8 on MacOSX and Windows (used in many dev boxes) uses the spawn context by default, whereas on Linux (typically CI and production) it uses the fork context. This can cause problems that appear for the first time in CI or production which are very hard to reproduce in development.

Expected behaviour

LocalCluster should always create subprocesses using the 'spawn' method.

Workaround

Run dask-scheduler and dask-worker from the command line, or start LocalCluster from a virgin interpreter.
Changing my pytest fixture to the below fixed all my issues:

def run_cluster(init_lock, shutdown_lock):
    cluster = distributed.LocalCluster(
        n_workers=4,
        threads_per_worker=1,
        host="localhost:12345",
        dashboard_address=None,
    )
    init_lock.release()
    shutdown_lock.acquire()
    cluster.close()

@pytest.fixture(scope="session")
async def dask_client(event_loop):
    ctx = multiprocessing.get_context("spawn")
    init_lock = ctx.Lock()
    init_lock.acquire()
    shutdown_lock = ctx.Lock()
    shutdown_lock.acquire()
    cluster = ctx.Process(target=run_cluster, args=(init_lock, shutdown_lock))
    cluster.start()
    init_lock.acquire()

    async with distributed.Client(
        "localhost:12345", asynchronous=True, set_as_default=False
    ) as client:
            yield client

    shutdown_lock.release()
    cluster.join()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions