Description
What happened:
Running an embarrassingly parallel map_overlap workload may be causing a memory leak in the scheduler. Upon completion, releasing the tasks and restarting the client does not reclaim the memory. The example below, with 200k tasks, shows a jump in scheduler memory from 100MB to 1.3GB while running the graph. After client.restart
, it remains at 1.1GB.
In addition, the memory of the workers climb into the "yellow", where I believe swapping to disk begins to happen. Given the parallel nature of this workload, workers ought to be able discard pieces when they are done with them.
From a performance perspective, during client.compute
, the scheduler gets unresponsive (it takes 20ish seconds to start), presumably because its loading a large graph. I have seen this cause already running computations to start erroring. I've seen lost keys and KilledWorkers.
And finally, anecdotally, it sometimes happens that one worker runs hot, getting 10x the tasks of other workers. Eventually, forward progress halts. I now watch for this, and then kill that worker, which redistributes the work and finishes the job. (I'm using dask-gateway on K8s).
What you expected to happen:
- The scheduler should not use up additional memory once a computation is done.
- Workers should shard a parallel job so that each shard can be discarded when done, keeping a low worker memory profile
- Loading a graph should not disrupt ongoing computation
Minimal Complete Verifiable Example:
import dask.array as da
import distributed
client = distributed.Client(n_workers=4, threads_per_worker=1, memory_limit='10GB')
arr = da.zeros((50, 2, 8192, 8192), chunks=(1, -1, 512, 512))
result = arr.map_overlap(lambda x: x, depth=(0,0,200,200))
store = result.to_zarr('/media/ssd/test/memory_scheduler.zarr', compute=False, overwrite=True)
future = client.compute(store)
Environment:
- Dask version: 2.18.1
- Distributed version: 2.18.0
- Python version: 3.7
- Operating System: Ubuntu 18.04
- Install method (conda, pip, source): conda