-
Notifications
You must be signed in to change notification settings - Fork 25
Multiprocessing reprojection #661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @vschaffn @adebardo, nice to see the progress! Summary: Upon reading the code in Why? Moving forward: geoutils/geoutils/raster/delayed.py Line 783 in b4da49b
So all the projected grid matching calculations to map the input chunks to output chunks written above that line (using the GeoGrid classes I mentioned) can simply stay the same 😉
|
Also note that Rasterio has reprojection errors that can appear (especially when comparing a tiled reprojection to a reprojection on the full raster), due to internal inconsistencies... |
To clarify one more remark: In Due to shape deformations, the mapping of input-output tiles between any CRS requires more than an overlap value, and also depends on the input/output chunksizes (what the You can essentially turn this list comprehension here into your loop for geoutils/geoutils/raster/delayed.py Line 784 in b4da49b
And let reproject_block stick the pieces of input tiles in the right places (you might not have a perfect square with all the tiles you opened), and give you the output: geoutils/geoutils/raster/delayed.py Line 644 in b4da49b
|
I should have thought about this last week (I was mostly off work and only popped for the comment above, so I didn't think about the big picture, sorry!): It is only |
9b7fe6c
to
06fbcfc
Compare
I looked at the Dask code again, and it didn't look too hard to adapt for Multiprocessing when already familiar with the underlying functions, so I gave it a go to save you time 😉. It's only about 10 lines of new or adapted code (ignoring the writing part of the Multiprocessing that we write the same as in #669). The rest is an exact copy of the Dask code. The most critical aspect was to call Here it is, with comments explaining the changes with the Dask code: ### (NO CHANGE) BLOCK FUNCTION FULLY COPIED FROM THE DASK ONE ABOVE, JUST WITHOUT THE @DELAYED DECORATOR
def _multiproc_reproject_per_block(
*src_arrs: tuple[NDArrayNum], block_ids: list[dict[str, int]], combined_meta: dict[str, Any], **kwargs: Any
) -> NDArrayNum:
"""
Delayed reprojection per destination block (also rebuilds a square array combined from intersecting source blocks).
"""
# If no source chunk intersects, we return a chunk of destination nodata values
if len(src_arrs) == 0:
# We can use float32 to return NaN, will be cast to other floating type later if that's not source array dtype
dst_arr = np.zeros(combined_meta["dst_shape"], dtype=np.dtype("float32"))
dst_arr[:] = kwargs["dst_nodata"]
return dst_arr
# First, we build an empty array with the combined shape, only with nodata values
comb_src_arr = np.ones((combined_meta["src_shape"]), dtype=src_arrs[0].dtype)
comb_src_arr[:] = kwargs["src_nodata"]
# Then fill it with the source chunks values
for i, arr in enumerate(src_arrs):
bid = block_ids[i]
comb_src_arr[bid["rys"] : bid["rye"], bid["rxs"] : bid["rxe"]] = arr
# Now, we can simply call Rasterio!
# We build the combined transform from tuple
src_transform = rio.transform.Affine(*combined_meta["src_transform"])
dst_transform = rio.transform.Affine(*combined_meta["dst_transform"])
# Reproject
dst_arr = np.zeros(combined_meta["dst_shape"], dtype=comb_src_arr.dtype)
_ = rio.warp.reproject(
comb_src_arr,
dst_arr,
src_transform=src_transform,
src_crs=kwargs["src_crs"],
dst_transform=dst_transform,
dst_crs=kwargs["dst_crs"],
resampling=kwargs["resampling"],
src_nodata=kwargs["src_nodata"],
dst_nodata=kwargs["dst_nodata"],
num_threads=1, # Force the number of threads to 1 to avoid Dask/Rasterio conflicting on multi-threading
)
return dst_arr
### (NEW WRAPPER) TO RUN BLOCK FUNCTION FOR MULTIPROC CALL: ADD READING FOR RASTER BLOCK + RETURN DESTINATION BLOCK ID
def _wrapper_multiproc_reproject_per_block(rst, src_block_ids, dst_block_id, idx_d2s, block_ids, combined_meta, kwargs):
"""Wrapper to use reproject_per_block for multiprocessing."""
# Get source array block for each destination block
s = src_block_ids
src_arrs = (rst.icrop(bbox=(s[idx]["xs"], s[idx]["ys"], s[idx]["xe"], s[idx]["ye"])).data for idx in idx_d2s)
# Call reproject per block
dst_block_arr = _multiproc_reproject_per_block(*src_arrs, block_ids=block_ids, combined_meta=combined_meta, **kwargs)
return dst_block_arr, dst_block_id
### (SMALL CHANGES, MOSTLY AT THE END) FINAL REPROJECT FUNCTION
def reproject_multiproc(
rst: Any, # NEW INPUT FOR MULTIPROC (INSTEAD OF DASK ARRAY)
config: Any, # NEW INPUT FOR MULTIPROC
src_chunksizes: tuple[int, int], # NEW INPUT FOR MULTIPROC (INSTEAD OF BEING SAVED WITHIN DASK ARRAY)
src_transform: rio.transform.Affine,
src_crs: rio.crs.CRS,
dst_transform: rio.transform.Affine,
dst_shape: tuple[int, int],
dst_crs: rio.crs.CRS,
resampling: rio.enums.Resampling,
src_nodata: int | float | None = None,
dst_nodata: int | float | None = None,
dst_chunksizes: tuple[int, int] | None = None,
**kwargs: Any,
) -> None:
"""
Reproject georeferenced raster on out-of-memory chunks with multiprocessing
"""
# 1/ Define source and destination chunked georeferenced grid through simple classes storing CRS/transform/shape,
# which allow to consistently derive shape/transform for each block and their CRS-projected footprints
# Define georeferenced grids for source/destination array
src_geogrid = GeoGrid(transform=src_transform, shape=rst.shape, crs=src_crs)
dst_geogrid = GeoGrid(transform=dst_transform, shape=dst_shape, crs=dst_crs)
# Add the chunking
# For source, we can use the .chunks attribute
# SMALL CHANGE: GENERATE THE TUPLES OF CHUNKS SIMILARLY AS FOR DASK, BASED ON CHUNKSIZE INPUT
chunks_x = tuple((src_chunksizes[0] if i<=rst.shape[0] else rst.shape[0] % src_chunksizes[0])
for i in np.arange(src_chunksizes[0], rst.shape[0] + src_chunksizes[0], src_chunksizes[0]))
chunks_y = tuple((src_chunksizes[1] if i<=rst.shape[1] else rst.shape[1] % src_chunksizes[1])
for i in np.arange(src_chunksizes[1], rst.shape[1] + src_chunksizes[1], src_chunksizes[1]))
src_chunks = (chunks_x, chunks_y)
src_geotiling = ChunkedGeoGrid(grid=src_geogrid, chunks=src_chunks)
# For destination, we need to create the chunks based on destination chunksizes
if dst_chunksizes is None:
dst_chunksizes = src_chunksizes
dst_chunks = _chunks2d_from_chunksizes_shape(chunksizes=dst_chunksizes, shape=dst_shape)
dst_geotiling = ChunkedGeoGrid(grid=dst_geogrid, chunks=dst_chunks)
# 2/ Get footprints of tiles in CRS of destination array, with a buffer of 2 pixels for destination ones to ensure
# overlap, then map indexes of source blocks that intersect a given destination block
src_footprints = src_geotiling.get_block_footprints(crs=dst_crs)
dst_footprints = dst_geotiling.get_block_footprints().buffer(2 * max(dst_geogrid.res))
dest2source = [np.where(dst.intersects(src_footprints).values)[0] for dst in dst_footprints]
# 3/ To reconstruct a square source array during chunked reprojection, we need to derive the combined shape and
# transform of each tuples of source blocks
src_block_ids = np.array(src_geotiling.get_block_locations())
meta_params = [
(
_combined_blocks_shape_transform(sub_block_ids=src_block_ids[sbid], src_geogrid=src_geogrid) # type: ignore
if len(sbid) > 0
else ({}, [])
)
for sbid in dest2source
]
# We also add the output transform/shape for this destination chunk in the combined meta
# (those are the only two that are chunk-specific)
dst_block_geogrids = dst_geotiling.get_blocks_as_geogrids()
for i, (c, _) in enumerate(meta_params):
c.update({"dst_shape": dst_block_geogrids[i].shape, "dst_transform": tuple(dst_block_geogrids[i].transform)})
# 4/ Call a delayed function that uses rio.warp to reproject the combined source block(s) to each destination block
# Add fixed arguments to keywords
kwargs.update(
{
"src_nodata": src_nodata,
"dst_nodata": dst_nodata,
"resampling": resampling,
"src_crs": src_crs,
"dst_crs": dst_crs,
}
)
### FROM HERE: ADAPTED CODE FOR MULTIPROC
# SMALL CHANGE: RETURN BLOCK LOCATIONS TO EASILY WRITE OUTPUT
# (WASN'T NEEDED FOR DASK THAT JUST CONCATENATED BLOCKS IN THE RIGHT ORDER)
# Get location of destination blocks to write file
dst_block_ids = np.array(dst_geotiling.get_block_locations())
# MODIFY DASK LIST COMPREHENSION INTO LOOP TO LAUNCH TASKS
# ADDING SRC BLOCK IDS TO THE WRAPPER CALL TO LOAD WITH ICROP
# Create tasks for multiprocessing
tasks = []
for i in range(len(dest2source)):
tasks.append(
config.cluster.launch_task(
fun=_wrapper_multiproc_reproject_per_block, args=[rst, src_block_ids, dst_block_ids[i], dest2source[i], meta_params[i][1], meta_params[i][0], kwargs],
)
)
result_list = []
# get first tile to retrieve dtype and nodata
result_tile0, _ = config.cluster.get_res(tasks[0])
# WRITE OUTPUT AS IN GENERIC MULTIPROC PR
# Create a new raster file to save the processed results
with rio.open(
config.outfile,
"w",
driver="GTiff",
height=dst_shape[0],
width=dst_shape[1],
count=1,
dtype=rst.dtype,
crs=dst_crs,
transform=dst_transform,
nodata=dst_nodata,
) as dst:
try:
# Iterate over the tasks and retrieve the processed tiles
for results in tasks:
dst_block_arr, dst_block_id = config.cluster.get_res(results)
# Define the window in the output file where the tile should be written
dst_window = rio.windows.Window(
col_off=dst_block_id["xs"],
row_off=dst_block_id["ys"],
width=dst_block_id["xe"] - dst_block_id["xs"],
height=dst_block_id["ye"] - dst_block_id["ys"],
)
# Cast to 3D
dst_block_arr = dst_block_arr[np.newaxis, :, :]
# Write the processed tile to the appropriate location in the output file
dst.write(dst_block_arr, window=dst_window)
except Exception as e:
raise RuntimeError(f"Error retrieving data from multiprocessing tasks: {e}") |
It runs but I didn't test it thoroughly, so hopefully I didn't add any bug and it works well quickly 😉. Also, I should justify why
This way, they don't have to go through the _user_input_reproject and _get_reproj_params steps which stay consistent no matter if the method runs on the full array, or on chunks of the array.
I think we should do the same with the final |
Also, if you want to test the internals of this geoutils/tests/test_raster/test_delayed.py Line 409 in 4d38e65
This will define all the arguments needed to run the |
@rhugonnet many thanks for your work and your explications. Here is my test code: import matplotlib.pyplot as plt
import rasterio as rio
import geoutils as gu
from geoutils import Raster
from geoutils.raster.distributed_computing import MultiprocConfig
from geoutils.raster.distributed_computing.delayed_multiproc import reproject_multiproc
example = gu.examples.get_path("exploradores_aster_dem")
outfile = "test.tif"
config = MultiprocConfig(chunk_size=200, outfile=outfile)
r = Raster(example)
# - Test reprojection with CRS change -
out_crs = rio.crs.CRS.from_epsg(4326)
# Single-process reprojection
r_single = r.reproject(crs=out_crs)
# Multiprocessing reprojection
reproject_multiproc(r, config, crs=out_crs)
r_multi = Raster(outfile)
plt.figure()
r_single.plot()
plt.figure()
r_multi.plot()
plt.figure()
diff1 = r_single - r_multi
diff1.plot()
plt.show() Here is the ### (SMALL CHANGES, MOSTLY AT THE END) FINAL REPROJECT FUNCTION
def reproject_multiproc(
rst: RasterType,
config: MultiprocConfig,
ref: RasterType | str | None = None,
crs: CRS | str | int | None = None,
res: float | abc.Iterable[float] | None = None,
grid_size: tuple[int, int] | None = None,
bounds: rio.coords.BoundingBox | None = None,
nodata: int | float | None = None,
dtype: DTypeLike | None = None,
resampling: Resampling | str = Resampling.bilinear,
force_source_nodata: int | float | None = None,
**kwargs: Any,
) -> None:
"""
Reproject georeferenced raster on out-of-memory chunks with multiprocessing
"""
# Process user inputs
dst_crs, dst_dtype, src_nodata, dst_nodata, dst_res, dst_bounds = _user_input_reproject(
source_raster=rst,
ref=ref,
crs=crs,
bounds=bounds,
res=res,
nodata=nodata,
dtype=dtype,
force_source_nodata=force_source_nodata,
)
# Retrieve transform and grid_size
dst_transform, dst_grid_size = _get_target_georeferenced_grid(
rst, crs=dst_crs, grid_size=grid_size, res=dst_res, bounds=dst_bounds
)
dst_width, dst_height = dst_grid_size
dst_shape = (dst_height, dst_width)
# 1/ Define source and destination chunked georeferenced grid through simple classes storing CRS/transform/shape,
# which allow to consistently derive shape/transform for each block and their CRS-projected footprints
# Define georeferenced grids for source/destination array
src_geogrid = GeoGrid(transform=rst.transform, shape=rst.shape, crs=rst.crs)
dst_geogrid = GeoGrid(transform=dst_transform, shape=dst_shape, crs=dst_crs)
# Add the chunking
# For source, we can use the .chunks attribute
# SMALL CHANGE: GENERATE THE TUPLES OF CHUNKS SIMILARLY AS FOR DASK, BASED ON CHUNKSIZE INPUT
chunks_x = tuple((config.chunk_size if i<=rst.shape[0] else rst.shape[0] % config.chunk_size)
for i in np.arange(config.chunk_size, rst.shape[0] + config.chunk_size, config.chunk_size))
chunks_y = tuple((config.chunk_size if i<=rst.shape[1] else rst.shape[1] % config.chunk_size)
for i in np.arange(config.chunk_size, rst.shape[1] + config.chunk_size, config.chunk_size))
src_chunks = (chunks_x, chunks_y)
src_geotiling = ChunkedGeoGrid(grid=src_geogrid, chunks=src_chunks)
# For destination, we need to create the chunks based on destination chunksizes
dst_chunks = _chunks2d_from_chunksizes_shape(chunksizes=(config.chunk_size, config.chunk_size), shape=dst_shape)
dst_geotiling = ChunkedGeoGrid(grid=dst_geogrid, chunks=dst_chunks)
# 2/ Get footprints of tiles in CRS of destination array, with a buffer of 2 pixels for destination ones to ensure
# overlap, then map indexes of source blocks that intersect a given destination block
src_footprints = src_geotiling.get_block_footprints(crs=dst_crs)
dst_footprints = dst_geotiling.get_block_footprints().buffer(2 * max(dst_geogrid.res))
dest2source = [np.where(dst.intersects(src_footprints).values)[0] for dst in dst_footprints]
# 3/ To reconstruct a square source array during chunked reprojection, we need to derive the combined shape and
# transform of each tuples of source blocks
src_block_ids = np.array(src_geotiling.get_block_locations())
meta_params = [
(
_combined_blocks_shape_transform(sub_block_ids=src_block_ids[sbid], src_geogrid=src_geogrid) # type: ignore
if len(sbid) > 0
else ({}, [])
)
for sbid in dest2source
]
# We also add the output transform/shape for this destination chunk in the combined meta
# (those are the only two that are chunk-specific)
dst_block_geogrids = dst_geotiling.get_blocks_as_geogrids()
for i, (c, _) in enumerate(meta_params):
c.update({"dst_shape": dst_block_geogrids[i].shape, "dst_transform": tuple(dst_block_geogrids[i].transform)})
# 4/ Call a delayed function that uses rio.warp to reproject the combined source block(s) to each destination block
# Add fixed arguments to keywords
kwargs.update(
{
"src_nodata": src_nodata,
"dst_nodata": dst_nodata,
"resampling": resampling,
"src_crs": rst.crs,
"dst_crs": dst_crs,
}
)
### FROM HERE: ADAPTED CODE FOR MULTIPROC
# SMALL CHANGE: RETURN BLOCK LOCATIONS TO EASILY WRITE OUTPUT
# (WASN'T NEEDED FOR DASK THAT JUST CONCATENATED BLOCKS IN THE RIGHT ORDER)
# Get location of destination blocks to write file
dst_block_ids = np.array(dst_geotiling.get_block_locations())
# MODIFY DASK LIST COMPREHENSION INTO LOOP TO LAUNCH TASKS
# ADDING SRC BLOCK IDS TO THE WRAPPER CALL TO LOAD WITH ICROP
# Create tasks for multiprocessing
tasks = []
for i in range(len(dest2source)):
tasks.append(
config.cluster.launch_task(
fun=_wrapper_multiproc_reproject_per_block, args=[rst, src_block_ids, dst_block_ids[i], dest2source[i], meta_params[i][1], meta_params[i][0], kwargs],
)
)
result_list = []
# get first tile to retrieve dtype and nodata
result_tile0, _ = config.cluster.get_res(tasks[0])
# WRITE OUTPUT AS IN GENERIC MULTIPROC PR
# Create a new raster file to save the processed results
with rio.open(
config.outfile,
"w",
driver="GTiff",
height=dst_height,
width=dst_width,
count=1,
dtype=rst.dtype,
crs=dst_crs,
transform=dst_transform,
nodata=dst_nodata,
) as dst:
try:
# Iterate over the tasks and retrieve the processed tiles
for results in tasks:
dst_block_arr, dst_block_id = config.cluster.get_res(results)
# Define the window in the output file where the tile should be written
dst_window = rio.windows.Window(
col_off=dst_block_id["xs"],
row_off=dst_block_id["ys"],
width=dst_block_id["xe"] - dst_block_id["xs"],
height=dst_block_id["ye"] - dst_block_id["ys"],
)
# Cast to 3D
dst_block_arr = dst_block_arr[np.newaxis, :, :]
# Write the processed tile to the appropriate location in the output file
dst.write(dst_block_arr, window=dst_window)
except Exception as e:
raise RuntimeError(f"Error retrieving data from multiprocessing tasks: {e}") |
@vschaffn I can't reproduce this, my diff plot is as large as the DEMs, looks like the tiles might be inverted. Did you fix something in my code that is not above? |
@rhugonnet it certainly comes from the |
Thanks, I forgot, that fixed it! 😉 OK, so I've investigated a bit. First, I ran tests between the Then, I ran tests between the So I think the cause is very likely the Rasterio errors I mention above in #661 (comment), even though they didn't take this shape last time I tested it, but maybe it's because I used random placeholder data and not DEM data. Here it looks dependent on the gradient... To test if this is indeed the case or an error in our implementation, we should try to run |
Looking at it right now... |
It's indeed an error in Rasterio, here's the same difference running Here's the code to do a 1-on-1 replacement between the two reproject: def _reproject_gdal(src_arr, src_transform, src_crs, dst_transform, dst_shape, dst_crs, resampling, src_nodata, dst_nodata):
from osgeo import gdal, gdalconst
resampling_mapping = {"nearest": gdalconst.GRA_NearestNeighbour, "bilinear": gdalconst.GRA_Bilinear,
"cubic": gdalconst.GRA_Cubic, "cubic_spline": gdalconst.GRA_CubicSpline}
gdal_resampling = resampling_mapping[resampling]
def _rio_to_gdal_geotransform(gt: rio.Affine) -> tuple[float, ...]:
dx, b, xmin, d, dy, ymax = list(gt)[:6]
gdal_gt = (xmin, dx, 0, ymax, 0, dy)
return gdal_gt
# Create input raster from array shape
drv = gdal.GetDriverByName('MEM')
src_ds = drv.Create('', src_arr.shape[1], src_arr.shape[0], 1, gdal.GDT_Float32)
# Create, define and set projection
src_ds.SetProjection(src_crs.to_wkt())
# Convert and set geotransform
src_gt = _rio_to_gdal_geotransform(src_transform)
src_ds.SetGeoTransform(src_gt)
# Write array and nodata value on first band
src_band = src_ds.GetRasterBand(1)
src_band.WriteArray(src_arr)
src_band.SetNoDataValue(src_nodata)
# Create output raster from destination shape
dst_ds = drv.Create("", dst_shape[1], dst_shape[0], 1, gdal.GDT_Float32)
# Create output projection
dst_ds.SetProjection(dst_crs.to_wkt())
dst_gt = _rio_to_gdal_geotransform(dst_transform)
dst_ds.SetGeoTransform(dst_gt)
# Copy the raster metadata of the source to dest
dst_ds.GetRasterBand(1).SetNoDataValue(dst_nodata)
dst_ds.GetRasterBand(1).Fill(dst_nodata)
# Reproject with resampling
gdal.ReprojectImage(src_ds, dst_ds, src_crs.to_wkt(), dst_crs.to_wkt(), gdal_resampling)
# Extract reprojected array
array = dst_ds.GetRasterBand(1).ReadAsArray().astype("float32")
array[array == dst_nodata] = np.nan
return array
|
Funnily, there's a tiny area at the bottom left that still has some differences... |
Actually the artefact above appears for the destination chunk after (600, XXX). I wonder if it's possible to test GDAL against itself (tiled versus all raster at once), to ensure that these last errors are from us. Maybe the issue is the precision of the geoutils/geoutils/raster/delayed.py Line 502 in 1bbfddd
To be sure of this, we could try with placeholder data where we control the transform... |
Tried to tweak floating point precision, without success... I got me thinking: In the tiled Rasterio vs full Rasterio, maybe one result is correct, and not the other. Interestingly, it's the Rasterio tiled reprojection (Multiprocessing or Dask) that it very close to GDAL: And the full-array Rasterio reprojection that is very different: |
Some potential reasons: OSGeo/gdal#2810 We could use We could also open an issue in GDAL to get feedback. |
ALELUJAH, GOT IT! 🥳 🎈 Two issues were the cause: 2/ GDAL uses different PROJ transformations based on the error threshold passed by the user. When using Now, combining the two issues above, we can solve all of our problems. warp_opts = {"XSCALE":1, "YSCALE": 1}
opts = gdal.WarpOptions(srcSRS=src_crs.to_wkt(), dstSRS=dst_crs.to_wkt(),
resampleAlg=gdal_resampling, errorThreshold=0, warpOptions=warp_opts)
gdal.Warp(dst_ds, src_ds, options=opts) Now with this, we have a perfect match for both EPSG:4326 (that had the lower left corner wrong) and EPSG:3412 (that was completely off) shown above: And, for our purpose, we need to do the same with Rasterio. |
Looks like OpenDataCube came to the same conclusion: opendatacube/datacube-core#1456 |
OK, so passing XSCALE and YSCALE to GDAL through Rasterio works well. I'll open a PR to make it user-defined. |
@rhugonnet Many thanks for your investigations and your work, It would have taken me weeks to track down the source of the problem 😄. I saw that you opened the PR on rasterio rasterio/rasterio#3325, I hope it will be validated soon 🤞
Does that sound right ? |
@vschaffn Yes, exactly! 😁 kwargs = {"XSCALE": 1, "YSCALE": 1}
from packaging.version import Version
if Version(rasterio.__version__) > Version("1.4.3"):
kwargs.update({"tolerance": 0})
_reproject_rio(..., **kwargs) For the tests: We can probably mirror a lot of stuff from here. And it's a good idea to add real data like you did (it's only synthetic in the Dask tests I think). For the GDAL code: As Rasterio directly calls GDAL.Warp, it's supposed to be a 1 on 1 (were it not for that option!), so maybe less useful here than for other cases like in xDEM, where we compare GDAL to SciPy or to our own terrain function. Yet GeoUtils does still use GDAL for one other test: And I see also these minor changes:
|
Ah and I just thought that we can't make the tests |
06fbcfc
to
33a890c
Compare
@rhugonnet As discussed I have adapted the structure to reduce at most duplication between multiprocessing, dask and reproject, and the multiprocessing is now available in |
Amazing!! 🥳 🎈 Great idea to add support for multi-band, this is extremely useful for reprojecting large raster stacks that are increasingly common 😉 (we might even have to allow the user to specify a "band" chunksize at some point! this can come later), and to verify this thoroughly in the tests! |
The work you both have done is impressive it's truly a privilege to witness it live. I don't feel comfortable merging this ticket myself. @rhugonnet, it's up to you. Do it whenever you feel ready. |
I think it's almost ready to merge! 😄 My last thought is that, like for #669, we should probably wait to show the changes in the documentation until we have finished converging on an API internally, so that it remains stable for users. For instance, we just realized now about the We can keep the nice additions to the |
A new release documenting all of these multiprocessing features would deserve to bump the package by a new minor version: GeoUtils 0.2! |
For instance, I've been thinking that it might be practical to return a This would allow to do this: rst = Raster(starting_file)
rst_reproj = rst.reproject(config=config1)
rst_reproj_prox = rst_reproj.proximity(config=config2) Instead of having to do this: rst = Raster(starting_file)
rst.reproject(config=...)
rst_reproj = Raster(config1.outfile)
rst_reproj.proximity(config=config2)
rst_reproj_prox = Raster(config2.outfile) I'll open a separate issue where we can list all our ideas towards a final API linked to multiprocessing calls! 😉 |
Are we commenting the new doc lines (or moving them to a single |
@rhugonnet I agree, especially after having manipulating a few times the multiproc reprojection. I made some quick changes:
Which doc lines do you want to comment ? |
Great! 😄 |
@rhugonnet all right, then I have commented the changes in |
Perfect, thanks! I'll merge after the CI run 😄. FYI: I noticed an error randomly failing in CI since a couple days. It's linked to Multiprocessing, and seems to be happening only on "ubuntu-latest" and "python 3.12". See here: https://github.com/GlacioHack/geoutils/actions/runs/14429620562/job/40462822308#step:13:760. You can find the same error in other Action runs of the past days. |
I think it is the tests that use a cluster, sometimes the cluster crashes (I don't know why unfortunately, I'm not a multiprocessing expert), hence the try/except block used with a time limit in functions using a cluster. I will try to investigate anyway. |
Ah yes, now that you say this, I've seen some CI runs be stuck for 2 hours a couple times recently too. Actually it's the case of the one I sent you above, lasted 1h45min! |
Resolves #648.
Summary
This PR introduces the multiprocessing possibility for raster reprojection, improving memory efficiency and processing speed when performing reprojection by leveraging tiled processing and multiprocessing.
Features
_multiproc_reproject
for tiled raster reprojection with multiprocessing and direct disk writing._wrapper_multiproc_reproject_per_block
and_reproject_per_block
to enable independent per-block reprojection using Dask structures.distributing_computed
inraster.__init__
.Changes
XSCALE=1, YSCALE=1, tolerance=0
._multiproc_reproject
.map_overlap_multiproc_save
for reuse in_multiproc_reproject
.multiproc_reproject
intoRaster.reproject
and_reproject
, reorganizing internal logic to better support multiprocessing.multiproc.py
.Tests
map_multiproc_collect
with tile coordinate return.test_multiproc_reproject
(pending nextrasterio
release).Documentation
raster_class.md
.MultiprocConfig
in the API.multiprocessing.md
.