Description
What is your issue?
Hi,
I have an embarrassingly parallel function that I am applying along the time
dimension. It needs some extra constant data (in this case some_big_constant
) that I pass through kwargs
. I find that unmanaged memory keeps increasing due to the kwargs being associated to each task. The problem gets worse when I have more chunks along the time dimension.
My doubts are:
- I would expect this computation to proceed in a streaming fashion: process one chunk, write the result to the correspondent zarr region, move on to the next chunk. Release memory for the finished task.
- I am okay with the extra memory for
some_big_constant
being allocated for each task, but I am surprised by the memory not being released - does Dask keeps all the tasks info in memory till the end of the computation? I guess that
some_big_constant
gets baked in into the partial foradd_one
. - any suggestion on how to remediate the issue?
EDIT: dask-2025.1.0 distributed-2025.1.0 xarray-2025.1.1 zarr-2.18.3
Full example:
import time
import xarray as xr
import numpy as np
import dask.array as da
from dask.distributed import Client
from dask.diagnostics import ResourceProfiler
client = Client(n_workers=1, threads_per_worker=1, memory_limit="20GB")
print(client.dashboard_link)
times = np.arange(250000)
other = np.arange(300)
some_big_constant = np.ones((10000, 5000))
data = da.random.random((len(times), len(other)), chunks=(25000, 300))
da = xr.DataArray(data, coords={"time": times, "y": other}, dims=["time", "y"])
def add_one(x, b):
time.sleep(1) # some op using b
return x + 1
with ResourceProfiler() as rprof:
result = xr.apply_ufunc(
add_one,
da,
dask="parallelized",
kwargs={"b": some_big_constant},
)
result.to_zarr("test_zarr.zarr", mode="w")
The dask graph looks good and parallel:
On the Dask dashboard, I see the unmanaged memory increasing as the computation proceeds. I see that store_map
proceeds well, which is comforting.
With the profiler, I see the memory increasing too. It roughly looks like there is one step up for every chunk (some chunks are probably loaded in memory at the same time).
In the Dask dashboard profile, I see the zarr calls at the very end of the computation only (the tall column). I would have expected to see some calls along the computation too (like how store_map
proceeds), but not overly concerned about this.