Description
I have been benchmarking our current spatial join algorithm, based on the benchmarks from https://github.com/Quansight/scipy2020_spatial_algorithms_at_scale (Scipy 2020 talk showing benchmarks with spatialpandas, pdf slides of the presentation are included in the repo, and there is a link to the youtube video as well).
In my fork of that repo, I added some extra cases using dask-geopandas instead of spatialpandas, and did some additional experiments: https://github.com/jorisvandenbossche/scipy2020_spatial_algorithms_at_scale
Benchmark setup
The benchmark code downloads GPS data points from OpenStreetMap (2.9 billion points, but clipped to US -> 113 million points). The point dataset is spatially sorted (with Hilbert curve) and saved as a partitioned Parquet dataset.
Those points are joined to a zip codes polygon dataset (tl_2019_us_zcta510). For the benchmarks, different random subsets of increasing size are created for the polygon zip layer (1-10-100-1000-10000 polygons).
So basically we do a spatial join (point-in-polygon) of a Parquet-backed partitioned point dataset with a in-memory (single partition) polygon layer. The left points is always the same size (113M points), the right polygons GeoDataFrameis incrementally larger (this are the different rows in the tables below).
The results are from my laptop, using a distributed scheduler started with 1 process with 4 threads (and a limit of 8GB memory).
Initial run
I created an equivalent notebook of the benchmark but using dask-geopandas instead of spatialpandas: https://github.com/jorisvandenbossche/scipy2020_spatial_algorithms_at_scale/blob/master/02_compared_cases/dask_geopandas/spatially_sorted_geopandas.ipynb
First, I also ran this with using a plain GeoPandas GeoDataFrame as the polygon layer. But then we don't make use of the spatial partitioning information (we just naively do a sjoin of each partition of the points dataset with the polygons geodataframe), because currently the dask_geopandas.sjoin
will not calculate spatial partitioning information, but only make use of it if already available. So for those cases, the spatially sorting doesn't add much value, and the size of the polygon dataset doesn't matter much. And, spatialpandas does calculate the spatial partitioning information, so for an equivalent benchmark, I converted the polygon geopandas.GeoDataFrame into a dask_geopandas version (with 1 partition) and calculated the spatial partitioning. At that point you clearly see that for the small polygon dataset (only a few polygons), this helps a lot as we only need to read in / join a few of the point partitions.
For the smaller data, spatialpandas and dask-geopandas were doing very similarly, but for a larger polygon dataset (for a point-in-polygon join), geopandas started to be a lot slower.
However, while looking into what the reason of this slowdown could be, I noticed that we were using the default predicate="intersects"
of sjoin
(that's the only predicate that spatialpandas supports). But for a point-in-polygon, I can explicitly pass predicate="within"
, and it seems that for the larger case, this makes it quite a bit faster:
Now it is actually very similar to the spatialpandas result.
(I was actually assuming I was doing an "within" join, and for that case, we switch left/right internally in the sjoin algo, so I was thinking that that might play a role, only to notice I wasn't actually doing a proper point-in-polygon sjoin (although of course for points/polygons "intersects" is essentially the same, but thus doesn't have the same performance characteristics)
Partitioning / spatially sorting the right polygon layer
Something I was thinking that could be a way to further improve the performance, was to also split and spatially shuffle the right (polygon) layer. I tried this for the largest case of 10,000 polygons, cut it in 10 partitions and shuffled it based on Hilbert distance. See https://nbviewer.jupyter.org/github/jorisvandenbossche/scipy2020_spatial_algorithms_at_scale/blob/master/02_compared_cases/dask_geopandas/spatially_sorted_geopandas.ipynb#Joining-with-a-partitioned-/-spatially-sorted-GeoDataFrame
However, this didn't actually improve much (in this case) ..
Some thoughts on why this could be the case:
- Note that the left (points) layer is already spatially sorted
- By splitting the polygons, each underlying
geopandas.sjoin
call is with fewer polygons and should be faster. But, we also slightly increase the number of sjoin calls (because the spatial partitions of left/right don't match exactly), creating more overhead. And the time spent inbulk_query
is in the end also relatively limited, so it might be that the speed-up in the actual query isn't sufficient to overcome the added overhead. - The final resulting number of matches the tree has to find is the same, so maybe it's quite efficient in short cutting when there is no overlap. Meaning, if you do a query with 1000 polygons that should give 1000 matches, or with 10_000 polygons that also gives 1000 matches, it's maybe not that much slower in the second case (wild guess here, didn't actually check this).
Note that the above was actually from running it on a subset of the point data, because with the full dataset I ran into memory issues.
Memory issues
While trying out the above case of using a partitioned dateset (with multiple partitions) for both left and right dataframe in the sjoin
, I couldn't complete the largest benchmark on my laptop with the limit I set of 8GB memory for the process. Watching the dashboard, the memory usage increased steadily, at some point it started to spill data to disk (dask has a threshold for that (percentage of total available memory) that can be configured, default is already 50% I think), and things started to generally run slow (tasks took a long time to start), and a bit later I interrupted the computation.
From looking at the task stream, it seems that in this case it is loading too many of the point partitions into memory upfront, exploding memory. In "theory", it should only load 4 partitions in memory at a time, since there are there are 4 threads that then can do a spatial join with it. But while the computation is running, the number of in-memory "read-parquet" tasks continuously increases, and at 60 it starts to spill to disk and run slow, and at 90 I interrupted the process.
I ran it again to make a screencast. Now it only didn't start spilling to disk, but still showed the large increase in memory usage / many in-memory "read-parquet" tasks: https://www.dropbox.com/s/q1t2ly6x2f2gwb2/Peek%202021-09-17%2021-58.gif?dl=0 (a gif of 100MB is maybe not the best way to do this, an actual video file might be easier ;))
So when not having a single-partition right dataframe, there is something going on in executing the graph that is not fully optimal. I don't really understand why the scheduler decides to read that many parquet files already in-memory (much more than can be processed in the subsequent sjoin step with 4 threads).
Some observations:
- There is a lot of "unmanaged memory" according to dask. See also this blogpost about this topic: https://coiled.io/blog/tackling-unmanaged-memory-with-dask/ (and docs: https://distributed.dask.org/en/latest/worker.html#memory-not-released-back-to-the-os)
- It might be that dask doesn't really know how "big" the GeoDataFrames are. And for the geometry column, we indeed give a gross underestimation at the moment in geopandas (https://github.com/geopandas/geopandas/blob/9e0770b470d02b8db2887e6fd22076be03b934e4/geopandas/array.py#L1092-L1094, i.e. only the bytes of the pointers to the Python objects, so not even the size of the pygeos Python objects, let alone the memory of the actual coordinates in GEOS). We could maybe give a better estimate here, although estimating it eg on the number of coordinates is also costly to compute.
- ... (further to investigate)
Parallelization of the spatial join
While doing many tests trying to understand what was going on, I actually also noticed that when running it with the single-threaded scheduler (easier for debugging errors), it was not actually much slower ... In other words, running this spatial join benchmark with 4 threads in parallel didn't give much parallelization benefit.
Testing some different setups of the local cluster, it seemed that running this benchmark with 4 workers with each 1 thread was actually faster than with 1 worker with 4 threads (so basically using processes instead of threads).
While the STRtree query_bulk releases the GIL (and thus can benefit of multithreading), it seems that the sjoin
in full doesn't release the GIL sufficiently to be efficient with multiple threads.
Profiling a single geopandas.sjoin
call on one of the partitions gives the following snakeviz profile:
Interactive version: https://jorisvandenbossche.github.io/host/sjoin_snakeviz_profile_static2.html
So for this specific one (for each partition of the dataset it will of course give a somewhat different result), you can see that the actual bulk_query is only 19% of the time, calculating the bounds is another 10%, and the actual cython join-algo from pandas (get_join_indexers
) is only 2 %. And AFAIK, those are the most relevant parts of this function that release the GIL.
Things we could look into to improve this:
- Improve the general basic
geopandas.sjoin
performance:- We could try to remove some python overhead: avoid unnecessary copy, (re)set_index, improve the performance of
set_geometry
, .. - Investigate if it is actually beneficial to calculate the total_bounds, instead of creating the tree and letting the tree see that there is nothing in common (reason: I noticed that the calculation of the total bounds actually take a considerable part of the sjoin, more than the actual bulk_query). Or, have an option to skip the check (which could be useful for dask-geopandas version in case we have spatial_partitioning information where we basically already did a total_bounds check)
- We could try to remove some python overhead: avoid unnecessary copy, (re)set_index, improve the performance of
- Release the GIL in the STRtree creation (now we only release it in the query_bulk)
- Work with larger partitions? (with the assumption that doing an sjoin on larger data increases the time in the actual query algo, and relatively reduces the python overhead)
The last item is something that I tried, creating a version of the partitioned Parquet point dataset with 100 partitions (each partition ~ 1 million points) instead of the original (from the upstream benchmarks) datasets with 1218 partitions (each partition ~100k points). Link to notebook
This didn't improve things for the smaller benchmark cases (because we needed to load larger partitions into memory, even when needing a small subset), but for the case with the largest polygons layer, it improved things slightly (3.1 min instead of 3.4min)
Summarizing notes
A too long post (I also spent too much time diving into this .. :)), but a few things we learnt:
- Never forget to check the predicate you are using (and experiment with it, as it can probably depend on your exact dataset)
- Dask-geopandas and dask-spatialpandas in the end perform more or less the same
- When your right dataframe is also partitioned, there still seems to be some memory issues that's worth investigating further
- Consider whether using more processes vs more threads in your cluster (it will also depend on the exact operations you are doing, but so for spatial join multithreading is not super ideal at the moment)
- There is room to improve the performance of the
geopandas.sjoin