Skip to content

Commit

Permalink
add functionality to mask clustering (e.g. catchment) and split clust…
Browse files Browse the repository at this point in the history
…ering by groups
  • Loading branch information
paswyss committed Oct 17, 2023
1 parent 0ca229e commit 81298bf
Showing 1 changed file with 101 additions and 34 deletions.
135 changes: 101 additions & 34 deletions TopoPyScale/topoclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import numpy as np
import pandas as pd
import xarray as xr
import rioxarray as rio
from munch import DefaultMunch
from sklearn.preprocessing import StandardScaler

Expand All @@ -46,7 +47,7 @@ def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = DefaultMunch.fromYAML(f)

if self.config.project.directory is None:
if self.config.project.directory in [None, {}]:
self.config.project.directory = os.getcwd() + '/'

except IOError:
Expand Down Expand Up @@ -288,27 +289,104 @@ def extract_topo_cluster_param(self):
TODO:
- try to migrate most code of this function to topo_sub.py as a function itself segment_topo()
"""
# ----------handle input----------
mask_file = self.config.sampling.toposub.clustering_mask
groups_file = self.config.sampling.toposub.clustering_groups

# ----------proceed----------

# read df param
df_param = ts.ds_to_indexed_dataframe(self.toposub.ds_param)
df_scaled, self.toposub.scaler = ts.scale_df(df_param,
features=self.config.sampling.toposub.clustering_features)
if self.config.sampling.toposub.clustering_method.lower() == 'kmean':
self.toposub.df_centroids, self.toposub.kmeans_obj, df_param['cluster_labels'] = ts.kmeans_clustering(
df_scaled,
features=self.config.sampling.toposub.clustering_features,
n_clusters=self.config.sampling.toposub.n_clusters,
seed=self.config.sampling.toposub.random_seed)
elif self.config.sampling.toposub.clustering_method.lower() == 'minibatchkmean':
self.toposub.df_centroids, self.toposub.kmeans_obj, df_param[
'cluster_labels'] = ts.minibatch_kmeans_clustering(
df_scaled,
n_clusters=self.config.sampling.toposub.n_clusters,
features=self.config.sampling.toposub.clustering_features,
n_cores=self.config.project.CPU_cores,
seed=self.config.sampling.toposub.random_seed)

# add mask and cluster feature
if mask_file in [None, {}]:
mask = [True] * len(df_param)
else:
# read mask TIFF
ds = rio.open_rasterio(mask_file).to_dataset('band').rename({1: 'mask'})

# check if bounds and resolution match
dem = self.toposub.ds_param
if not ds.rio.bounds() == dem.rio.bounds() or not ds.rio.resolution() == dem.rio.resolution():
raise ValueError(
'The GeoTIFFS of the DEM and the MASK need to habe the same bounds/resolution. Please check.')
print(f'---> Only consider grid cells inside mask ({Path(mask_file).name})')

# get mask
mask = ts.ds_to_indexed_dataframe(ds)['mask'] == 1

# add cluster groups
if groups_file in [None, {}]:
split_clustering = False
df_param['cluster_group'] = 1
groups = [1]
else:
print('ERROR: {} clustering method not available'.format(self.config.sampling.toposub.clustering_method))
self.toposub.df_centroids = ts.inverse_scale_df(self.toposub.df_centroids, self.toposub.scaler,
features=self.config.sampling.toposub.clustering_features)
split_clustering = True

# read cluster TIFF
ds = rio.open_rasterio(groups_file).to_dataset('band').rename({1: 'group'})

# check if bounds and resolution match
dem = self.toposub.ds_param
if not ds.rio.bounds() == dem.rio.bounds() or not ds.rio.resolution() == dem.rio.resolution():
raise ValueError(
'The GeoTIFFS of the DEM and the MASK need to habe the same bounds/resolution. Please check.')

# add cluster group
df_param['cluster_group'] = ts.ds_to_indexed_dataframe(ds)['group'].astype(int)

groups = sorted(df_param.cluster_group.unique())
print(f'---> Split clustering into {len(groups)} groups.')

# ----------cluster per group----------
self.toposub.df_centroids = pd.DataFrame()
total_clusters = 0
for group in groups:
if split_clustering:
print(f'cluster group: {group}')
subset_mask = mask & (df_param.cluster_group == group)
df_subset = df_param[subset_mask]

# derive number of clusters
relative_cover = np.count_nonzero(subset_mask) / np.count_nonzero(mask) # relative area of the group
n_clusters = int(np.ceil(self.config.sampling.toposub.n_clusters * relative_cover))

df_scaled, scaler = ts.scale_df(df_subset, features=self.config.sampling.toposub.clustering_features)
if self.config.sampling.toposub.clustering_method.lower() == 'kmean':
df_centroids, _, cluster_labels = ts.kmeans_clustering(
df_scaled,
features=self.config.sampling.toposub.clustering_features,
n_clusters=n_clusters,
seed=self.config.sampling.toposub.random_seed)
elif self.config.sampling.toposub.clustering_method.lower() == 'minibatchkmean':
df_centroids, _, cluster_labels = ts.minibatch_kmeans_clustering(
df_scaled,
n_clusters=n_clusters,
features=self.config.sampling.toposub.clustering_features,
n_cores=self.config.project.CPU_cores,
seed=self.config.sampling.toposub.random_seed)
else:
raise ValueError(
'ERROR: {} clustering method not available'.format(self.config.sampling.toposub.clustering_method))

df_centroids = ts.inverse_scale_df(df_centroids, scaler,
features=self.config.sampling.toposub.clustering_features)

# re-number cluster labels
df_param.loc[subset_mask, 'cluster_labels'] = cluster_labels + total_clusters
df_centroids.index = df_centroids.index + total_clusters

# merge df centroids
self.toposub.df_centroids = pd.concat([self.toposub.df_centroids, df_centroids])

# update total clusters
total_clusters += n_clusters

if not split_clustering:
# drop cluster_group
df_param = df_param.drop(columns=['cluster_group'])
else:
print(f'-----> Created in total {total_clusters} clusters in {len(groups)} groups.')

# logic to add variables not used as clustering predictors into df_centroids
feature_list = self.config.sampling.toposub.clustering_features.keys()
Expand All @@ -321,6 +399,8 @@ def extract_topo_cluster_param(self):
n_digits = len(str(self.toposub.df_centroids.index.max()))
self.toposub.df_centroids['point_id'] = self.toposub.df_centroids.index.astype(int).astype(str).str.zfill(
n_digits)

# Build the final cluster map
self.toposub.ds_param['cluster_labels'] = (
["y", "x"], np.reshape(df_param.cluster_labels.values, self.toposub.ds_param.slope.shape))

Expand All @@ -333,7 +413,7 @@ def extract_topo_cluster_param(self):

def extract_topo_param(self):
"""
Function to select which
Function to select which
"""
if os.path.isfile(self.config.project.directory + 'outputs/' + self.config.outputs.file.df_centroids):
self.toposub.df_centroids = pd.read_pickle(
Expand Down Expand Up @@ -512,19 +592,6 @@ def downscale_climate(self):
for file in flist:
os.remove(file)

# concatenate ds solar
print('concatenating solar files')
out_solar_name = Path(self.config.outputs.file.ds_solar)
solar_pattern = f'{out_solar_name.stem}_*{out_solar_name.suffix}'
solar_flist = sorted(self.config.outputs.path.glob(solar_pattern))
ds_solar_list = [xr.open_dataset(file, engine='h5netcdf') for file in solar_flist]
fout = Path(self.config.outputs.path, out_solar_name)
ds = xr.concat(ds_solar_list, dim='time')
ds.to_netcdf(fout, engine='h5netcdf')
[f.unlink() for f in solar_flist] # delete tmp solar files
print('solar files concatenated')
del ds

else:
ta.downscale_climate(self.config.project.directory,
self.config.climate.path,
Expand Down

0 comments on commit 81298bf

Please sign in to comment.