Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PointCloud subclass of Vector #492

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'upstream/main' into pointcloud_class
  • Loading branch information
rhugonnet committed Nov 29, 2024
commit 0a1a9f52ac3aff1d9312c6bf51575e120e04d6a8
295 changes: 1 addition & 294 deletions geoutils/vector/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pdal
import rasterio as rio
from geopandas.testing import assert_geodataframe_equal
from mpl_toolkits.axes_grid1 import make_axes_locatable
Expand Down Expand Up @@ -1485,296 +1484,4 @@ def buffer_without_overlap(self, buffer_size: int | float, metric: bool = True,
>>> plt.plot() # doctest: +SKIP
"""

# Project in local UTM if metric is True
if metric:
crs_utm_ups = _get_utm_ups_crs(df=self.ds)
gdf = self.ds.to_crs(crs=crs_utm_ups)
else:
gdf = self.ds

# Dissolve all geometries into one
merged = gdf.dissolve()

# Add buffer around geometries
merged_buffer = merged.buffer(buffer_size)

# Extract only the buffered area
buffer = merged_buffer.difference(merged)

# Crop Voronoi polygons to bound geometry and add missing polygons
bound_poly = bounds2poly(gdf)
bound_poly = bound_poly.buffer(buffer_size)
voronoi_all = generate_voronoi_with_bounds(gdf, bound_poly)
if plot:
plt.figure(figsize=(16, 4))
ax1 = plt.subplot(141)
voronoi_all.plot(ax=ax1)
gdf.plot(fc="none", ec="k", ax=ax1)
ax1.set_title("Voronoi polygons, cropped")

# Extract Voronoi polygons only within the buffer area
voronoi_diff = voronoi_all.intersection(buffer.geometry[0])

# Split all polygons, and join attributes of original geometries into the Voronoi polygons
# Splitting, i.e. explode, is needed when Voronoi generate MultiPolygons that may extend over several features.
voronoi_gdf = gpd.GeoDataFrame(geometry=voronoi_diff.explode(index_parts=True)) # requires geopandas>=0.10
joined_voronoi = gpd.tools.sjoin(gdf, voronoi_gdf, how="right")

# Plot results -> some polygons are duplicated
if plot:
ax2 = plt.subplot(142, sharex=ax1, sharey=ax1)
joined_voronoi.plot(ax=ax2, column="index_left", alpha=0.5, ec="k")
gdf.plot(ax=ax2, column=gdf.index.values)
ax2.set_title("Buffer with duplicated polygons")

# Find non unique Voronoi polygons, and retain only first one
_, indexes = np.unique(joined_voronoi.index, return_index=True)
unique_voronoi = joined_voronoi.iloc[indexes]

# Plot results -> unique polygons only
if plot:
ax3 = plt.subplot(143, sharex=ax1, sharey=ax1)
unique_voronoi.plot(ax=ax3, column="index_left", alpha=0.5, ec="k")
gdf.plot(ax=ax3, column=gdf.index.values)
ax3.set_title("Buffer with unique polygons")

# Dissolve all polygons by original index
merged_voronoi = unique_voronoi.dissolve(by="index_left")

# Plot
if plot:
ax4 = plt.subplot(144, sharex=ax1, sharey=ax1)
gdf.plot(ax=ax4, column=gdf.index.values)
merged_voronoi.plot(column=merged_voronoi.index.values, ax=ax4, alpha=0.5)
ax4.set_title("Final buffer")
plt.show()

# Reverse-project to the original CRS if metric is True
if metric:
merged_voronoi = merged_voronoi.to_crs(crs=self.crs)

return Vector(merged_voronoi)


######################
# Point cloud subclass
######################

def _read_pdal(filename: str, **kwargs: Any) -> gpd.GeoDataFrame:
"""Read a point cloud dataset with PDAL."""

# Basic json command to read an entire file
json_string = f"""
[
"{filename}"
]
"""

# Run and extract array as dataframe
pipeline = pdal.Pipeline(json_string)
pipeline.execute()
df = pd.DataFrame(pipeline.arrays[0])

# Build geodataframe from 2D points and data table
crs = CRS.from_wkt(pipeline.srswkt2)
gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(x=df["X"], y=df["Y"], crs=crs), data=df.iloc[:, 2:])

return gdf

def _write_pdal(filename: str, **kwargs):
"""Write a point cloud dataset with PDAL."""

return


class PointCloud(Vector):
"""
The georeferenced point cloud.

A point cloud is a vector of 2D point geometries associated to values from a specific data column.

Main attributes:
ds: :class:`geopandas.GeoDataFrame`
Geodataframe of the point cloud.
data_column: str
Name of point cloud data column.
crs: :class:`pyproj.crs.CRS`
Coordinate reference system of the point cloud.
bounds: :class:`rio.coords.BoundingBox`
Coordinate bounds of the point cloud.


All other attributes are derivatives of those attributes, or read from the file on disk.
See the API for more details.
"""

data_column: str | None

def __init__(self,
filename_or_dataset: str | pathlib.Path | gpd.GeoDataFrame | gpd.GeoSeries | BaseGeometry,
data_column: str | None = None,):
"""
Instantiate a point cloud from either a data column name and a vector (filename, GeoPandas dataframe or series,
or a Shapely geometry), or only with a point cloud file type.

:param filename_or_dataset: Path to file, or GeoPandas dataframe or series, or Shapely geometry.
:param data_column: Name of data column defining the point cloud.
"""

# If PointCloud is passed, simply point back to PointCloud
if isinstance(filename_or_dataset, PointCloud):
for key in filename_or_dataset.__dict__:
setattr(self, key, filename_or_dataset.__dict__[key])
return
# Else rely on parent Raster class options (including raised errors)
else:
super().__init__(filename_or_dataset)

if data_column not in self.ds.columns():
raise ValueError(f"Data column {data_column} not found in vector file, available columns "
f"are: {', '.join(self.ds.columns)}.")
self.data_column = data_column

@classmethod
def from_array(cls, array: np.ndarray, crs: CRS, data_column: str | None = None) -> PointCloud:
"""Create point cloud from a 3 x N or N x 3 array of X coordinate, Y coordinates and Z values."""

# Check shape
if array.shape[0] != 3 and array.shape[1] != 3:
raise ValueError("Array must be of shape 3xN or Nx3.")

# Make the first axis the one with size 3
if array.shape[0] != 3:
array_in = array.T
else:
array_in = array

# Build geodataframe
gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(x=array_in[0, :], y=array_in[1, : ], crs=crs),
data={"z": array[2, :]})

return cls(filename_or_dataset=gdf, data_column=data_column)


@classmethod
def from_tuples(self, tuples: Iterable[tuple[Number, Number, Number]], crs: CRS, data_column: str | None = None):
"""Create point cloud from a N-size list of tuples (X coordinate, Y coordinate, Z value)."""

array_in = np.array(zip(*tuples))


def to_array(self):
"""Convert point cloud to a 3 x N array of X coordinates, Y coordinates and Z values."""


return

def to_tuples(self):
"""Convert point cloud to a N-size list of tuples (X coordinate, Y coordinate, Z value)."""

return

def interp_raster(self, raster: gu.Raster):
"""Interpolate raster at point cloud coordinates."""

return

# -----------------------------------------
# Additional stand-alone utility functions
# -----------------------------------------


def extract_vertices(gdf: gpd.GeoDataFrame) -> list[list[tuple[float, float]]]:
r"""
Function to extract the exterior vertices of all shapes within a gpd.GeoDataFrame.

:param gdf: The GeoDataFrame from which the vertices need to be extracted.

:returns: A list containing a list of (x, y) positions of the vertices. The length of the primary list is equal
to the number of geometries inside gdf, and length of each sublist is the number of vertices in the geometry.
"""
vertices = []
# Loop on all geometries within gdf
for geom in gdf.geometry:
# Extract geometry exterior(s)
if geom.geom_type == "MultiPolygon":
exteriors = [p.exterior for p in geom.geoms]
elif geom.geom_type == "Polygon":
exteriors = [geom.exterior]
elif geom.geom_type == "LineString":
exteriors = [geom]
elif geom.geom_type == "MultiLineString":
exteriors = list(geom.geoms)
else:
raise NotImplementedError(f"Geometry type {geom.geom_type} not implemented.")

vertices.extend([list(ext.coords) for ext in exteriors])

return vertices


def generate_voronoi_polygons(gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Generate Voronoi polygons (tessellation) from the vertices of all geometries in a GeoDataFrame.

Uses scipy.spatial.voronoi.

:param: The GeoDataFrame from whose vertices are used for the Voronoi polygons.

:returns: A GeoDataFrame containing the Voronoi polygons.
"""
# Extract the coordinates of the vertices of all geometries in gdf
vertices = extract_vertices(gdf)
coords = np.concatenate(vertices)

# Create the Voronoi diagram and extract ridges
vor = Voronoi(coords)
lines = [shapely.geometry.LineString(vor.vertices[line]) for line in vor.ridge_vertices if -1 not in line]
polys = list(shapely.ops.polygonize(lines))
if len(polys) == 0:
raise ValueError("Invalid geometry, cannot generate finite Voronoi polygons")

# Convert into GeoDataFrame
voronoi = gpd.GeoDataFrame(geometry=gpd.GeoSeries(polys))
voronoi.crs = gdf.crs

return voronoi


def generate_voronoi_with_bounds(gdf: gpd.GeoDataFrame, bound_poly: Polygon) -> gpd.GeoDataFrame:
"""
Generate Voronoi polygons that are bounded by the polygon bound_poly, to avoid Voronoi polygons that extend \
far beyond the original geometry.

Voronoi polygons are created using generate_voronoi_polygons, cropped to the extent of bound_poly and gaps \
are filled with new polygons.

:param: The GeoDataFrame from whose vertices are used for the Voronoi polygons.
:param: A shapely Polygon to be used for bounding the Voronoi diagrams.

:returns: A GeoDataFrame containing the Voronoi polygons.
"""
# Create Voronoi polygons
voronoi = generate_voronoi_polygons(gdf)

# Crop Voronoi polygons to input bound_poly extent
voronoi_crop = voronoi.intersection(bound_poly)
voronoi_crop = gpd.GeoDataFrame(geometry=voronoi_crop) # convert to DataFrame

# Dissolve all Voronoi polygons and subtract from bounds to get gaps
voronoi_merged = voronoi_crop.dissolve()
bound_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(bound_poly))
bound_gdf.crs = gdf.crs
gaps = bound_gdf.difference(voronoi_merged)

# Merge cropped Voronoi with gaps, if not empty, otherwise return cropped Voronoi
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "Geometry is in a geographic CRS. Results from 'area' are likely incorrect.")
tot_area = np.sum(gaps.area.values)

if not tot_area == 0:
voronoi_all = gpd.GeoDataFrame(geometry=list(voronoi_crop.geometry) + list(gaps.geometry))
voronoi_all.crs = gdf.crs
return voronoi_all
else:
return voronoi_crop
return _buffer_without_overlap(self.ds, buffer_size=buffer_size, metric=metric, plot=plot)
You are viewing a condensed version of this merge commit. You can view the full changes here.