Skip to content

speeding up windowed writes #439

@artttt

Description

@artttt

Ive been writing COGs using the windowed option
da.rio.to_raster(...... driver="COG",windowed=True)

It works fine but is rather slow.

my array is on a set of distributed dask workers that dont share a file system so as far is i can tell using a lock isnt an option.

It looks to me that it is slow because of the windows being written are very small as they match the cog internal tiling.

I did a bit of testing and for instance if you use extra options such as:

        tiled=True,
        driver="GTiff",
        blockxsize =  10000,
        blockysize = 10000,

the writing is much faster and approaching the speed of the non windowed write and makes better use of the available ram and CPU for compression. Obviously its no longer a COG though.

However if i change the raster_writer code with the following I get a COG and I get fast writes and as far as ive tested it works for arbitrarily large arrays.

def to_raster(self, xarray_dataarray, tags, windowed, lock, compute, **kwargs):
    """
    This method writes to the raster on disk.
    xarray_dataarray: xarray.DataArray
        The input data array to write to disk.
    tags: dict, optional
        A dictionary of tags to write to the raster.
    windowed: bool
        If True and the data array is not a dask array, it will write
        the data to disk using rasterio windows.
    lock: boolean or Lock, optional
        Lock to use to write data using dask.
        If not supplied, it will use a single process.
    compute: bool
        If True (default) and data is a dask array, then compute and save
        the data immediately. If False, return a dask Delayed object.
        Call ".compute()" on the Delayed object to compute the result
        later. Call ``dask.compute(delayed1, delayed2)`` to save
        multiple delayed files at once.
    dtype: np.dtype
        Numpy-compliant dtype used to save raster. If data is not already
        represented by this dtype in memory it is recast. dtype='complex_int16'
        is a special case to write in-memory np.complex64 to CInt16.
    **kwargs
        Keyword arguments to pass into writing the raster.
    """
    kwargs["dtype"], numpy_dtype = _get_dtypes(
        kwargs["dtype"],
        xarray_dataarray.encoding.get("rasterio_dtype"),
        xarray_dataarray.encoding.get("dtype", str(xarray_dataarray.dtype)),
    )

    if kwargs["nodata"] is not None:
        # Ensure dtype of output data matches the expected dtype.
        # This check is added here as the dtype of the data is
        # converted right before writing.
        kwargs["nodata"] = _ensure_nodata_dtype(kwargs["nodata"], numpy_dtype)

    with rasterio.open(self.raster_path, "w", **kwargs) as rds:
        _write_metatata_to_raster(rds, xarray_dataarray, tags)
        if not (lock and is_dask_collection(xarray_dataarray.data)):
            # write data to raster immmediately if not dask array
            if windowed:
                #window_iter = rds.block_windows(1)
                #a bit of a hack to get block windows that are much larger then the cog block windows
                #I wonder how best to align these.
                #maybe to some multiple of the cog blocks (is there an order that is better to write cog blocks?)
                #maybe align to the dask blocks like is done if using a lock
                #to look at : could the encode_cf_variable step occur on the dask workers
                temp_kwargs = kwargs.copy()
                temp_kwargs['driver'] = 'GTiff'
                temp_kwargs['tiled'] = True
                temp_kwargs['blockxsize'] = 512 * 20
                temp_kwargs['blockysize'] =  512 * 20
                with rasterio.io.MemoryFile() as memfile:
                    with memfile.open(**temp_kwargs) as temp_rds:     
                        window_iter = list(temp_rds.block_windows(1))
            else:
                window_iter = [(None, None)]
            for _, window in window_iter:
                if window is not None:
                    out_data = xarray_dataarray.rio.isel_window(window)
                else:
                    out_data = xarray_dataarray
                data = encode_cf_variable(out_data).values.astype(numpy_dtype)
                if data.ndim == 2:
                    rds.write(data, 1, window=window)
                else:
                    rds.write(data, window=window)

    if lock and is_dask_collection(xarray_dataarray.data):
        return dask.array.store(
            encode_cf_variable(xarray_dataarray).data.astype(numpy_dtype),
            self,
            lock=lock,
            compute=compute,
        )
    return None

Basically im decoupling the block_windows used for writing from the internal block_windows of the COG. Obviously done in a slightly hacky way and it would need to be refined so that the windows are suitable for the ram available on the system.

I think if refined enough there is no reason why not to make this the standard way of writing (no need for a windowed=False option, It would also be interesting to see how it performs compared to using a lock

Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    daskDask related issue.proposalIdea for a new feature.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions