Skip to content

High unmanaged memory using kwargs in apply_ufunc with Dask #9981

Open
@abiasiol

Description

@abiasiol

What is your issue?

Hi,
I have an embarrassingly parallel function that I am applying along the time dimension. It needs some extra constant data (in this case some_big_constant) that I pass through kwargs. I find that unmanaged memory keeps increasing due to the kwargs being associated to each task. The problem gets worse when I have more chunks along the time dimension.

My doubts are:

  • I would expect this computation to proceed in a streaming fashion: process one chunk, write the result to the correspondent zarr region, move on to the next chunk. Release memory for the finished task.
  • I am okay with the extra memory for some_big_constant being allocated for each task, but I am surprised by the memory not being released
  • does Dask keeps all the tasks info in memory till the end of the computation? I guess that some_big_constant gets baked in into the partial for add_one.
  • any suggestion on how to remediate the issue?

EDIT: dask-2025.1.0 distributed-2025.1.0 xarray-2025.1.1 zarr-2.18.3

Full example:

import time
import xarray as xr
import numpy as np
import dask.array as da
from dask.distributed import Client
from dask.diagnostics import ResourceProfiler

client = Client(n_workers=1, threads_per_worker=1, memory_limit="20GB")
print(client.dashboard_link)

times = np.arange(250000)
other = np.arange(300)
some_big_constant = np.ones((10000, 5000))

data = da.random.random((len(times), len(other)), chunks=(25000, 300))
da = xr.DataArray(data, coords={"time": times, "y": other}, dims=["time", "y"])


def add_one(x, b):
    time.sleep(1)  # some op using b
    return x + 1


with ResourceProfiler() as rprof:
    result = xr.apply_ufunc(
        add_one,
        da,
        dask="parallelized",
        kwargs={"b": some_big_constant},
    )
    result.to_zarr("test_zarr.zarr", mode="w")

The dask graph looks good and parallel:
Image

On the Dask dashboard, I see the unmanaged memory increasing as the computation proceeds. I see that store_map proceeds well, which is comforting.

Image

With the profiler, I see the memory increasing too. It roughly looks like there is one step up for every chunk (some chunks are probably loaded in memory at the same time).

Image

In the Dask dashboard profile, I see the zarr calls at the very end of the computation only (the tall column). I would have expected to see some calls along the computation too (like how store_map proceeds), but not overly concerned about this.

Image

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions