diff --git a/TopoPyScale/topoclass.py b/TopoPyScale/topoclass.py index d3c53bd..1e82baa 100644 --- a/TopoPyScale/topoclass.py +++ b/TopoPyScale/topoclass.py @@ -21,6 +21,7 @@ import numpy as np import pandas as pd import xarray as xr +import rioxarray as rio from munch import DefaultMunch from sklearn.preprocessing import StandardScaler @@ -46,7 +47,7 @@ def __init__(self, config_file): with open(config_file, 'r') as f: self.config = DefaultMunch.fromYAML(f) - if self.config.project.directory is None: + if self.config.project.directory in [None, {}]: self.config.project.directory = os.getcwd() + '/' except IOError: @@ -288,27 +289,104 @@ def extract_topo_cluster_param(self): TODO: - try to migrate most code of this function to topo_sub.py as a function itself segment_topo() """ + # ----------handle input---------- + mask_file = self.config.sampling.toposub.clustering_mask + groups_file = self.config.sampling.toposub.clustering_groups + + # ----------proceed---------- + + # read df param df_param = ts.ds_to_indexed_dataframe(self.toposub.ds_param) - df_scaled, self.toposub.scaler = ts.scale_df(df_param, - features=self.config.sampling.toposub.clustering_features) - if self.config.sampling.toposub.clustering_method.lower() == 'kmean': - self.toposub.df_centroids, self.toposub.kmeans_obj, df_param['cluster_labels'] = ts.kmeans_clustering( - df_scaled, - features=self.config.sampling.toposub.clustering_features, - n_clusters=self.config.sampling.toposub.n_clusters, - seed=self.config.sampling.toposub.random_seed) - elif self.config.sampling.toposub.clustering_method.lower() == 'minibatchkmean': - self.toposub.df_centroids, self.toposub.kmeans_obj, df_param[ - 'cluster_labels'] = ts.minibatch_kmeans_clustering( - df_scaled, - n_clusters=self.config.sampling.toposub.n_clusters, - features=self.config.sampling.toposub.clustering_features, - n_cores=self.config.project.CPU_cores, - seed=self.config.sampling.toposub.random_seed) + + # add mask and cluster feature + if mask_file in [None, {}]: + mask = [True] * len(df_param) + else: + # read mask TIFF + ds = rio.open_rasterio(mask_file).to_dataset('band').rename({1: 'mask'}) + + # check if bounds and resolution match + dem = self.toposub.ds_param + if not ds.rio.bounds() == dem.rio.bounds() or not ds.rio.resolution() == dem.rio.resolution(): + raise ValueError( + 'The GeoTIFFS of the DEM and the MASK need to habe the same bounds/resolution. Please check.') + print(f'---> Only consider grid cells inside mask ({Path(mask_file).name})') + + # get mask + mask = ts.ds_to_indexed_dataframe(ds)['mask'] == 1 + + # add cluster groups + if groups_file in [None, {}]: + split_clustering = False + df_param['cluster_group'] = 1 + groups = [1] else: - print('ERROR: {} clustering method not available'.format(self.config.sampling.toposub.clustering_method)) - self.toposub.df_centroids = ts.inverse_scale_df(self.toposub.df_centroids, self.toposub.scaler, - features=self.config.sampling.toposub.clustering_features) + split_clustering = True + + # read cluster TIFF + ds = rio.open_rasterio(groups_file).to_dataset('band').rename({1: 'group'}) + + # check if bounds and resolution match + dem = self.toposub.ds_param + if not ds.rio.bounds() == dem.rio.bounds() or not ds.rio.resolution() == dem.rio.resolution(): + raise ValueError( + 'The GeoTIFFS of the DEM and the MASK need to habe the same bounds/resolution. Please check.') + + # add cluster group + df_param['cluster_group'] = ts.ds_to_indexed_dataframe(ds)['group'].astype(int) + + groups = sorted(df_param.cluster_group.unique()) + print(f'---> Split clustering into {len(groups)} groups.') + + # ----------cluster per group---------- + self.toposub.df_centroids = pd.DataFrame() + total_clusters = 0 + for group in groups: + if split_clustering: + print(f'cluster group: {group}') + subset_mask = mask & (df_param.cluster_group == group) + df_subset = df_param[subset_mask] + + # derive number of clusters + relative_cover = np.count_nonzero(subset_mask) / np.count_nonzero(mask) # relative area of the group + n_clusters = int(np.ceil(self.config.sampling.toposub.n_clusters * relative_cover)) + + df_scaled, scaler = ts.scale_df(df_subset, features=self.config.sampling.toposub.clustering_features) + if self.config.sampling.toposub.clustering_method.lower() == 'kmean': + df_centroids, _, cluster_labels = ts.kmeans_clustering( + df_scaled, + features=self.config.sampling.toposub.clustering_features, + n_clusters=n_clusters, + seed=self.config.sampling.toposub.random_seed) + elif self.config.sampling.toposub.clustering_method.lower() == 'minibatchkmean': + df_centroids, _, cluster_labels = ts.minibatch_kmeans_clustering( + df_scaled, + n_clusters=n_clusters, + features=self.config.sampling.toposub.clustering_features, + n_cores=self.config.project.CPU_cores, + seed=self.config.sampling.toposub.random_seed) + else: + raise ValueError( + 'ERROR: {} clustering method not available'.format(self.config.sampling.toposub.clustering_method)) + + df_centroids = ts.inverse_scale_df(df_centroids, scaler, + features=self.config.sampling.toposub.clustering_features) + + # re-number cluster labels + df_param.loc[subset_mask, 'cluster_labels'] = cluster_labels + total_clusters + df_centroids.index = df_centroids.index + total_clusters + + # merge df centroids + self.toposub.df_centroids = pd.concat([self.toposub.df_centroids, df_centroids]) + + # update total clusters + total_clusters += n_clusters + + if not split_clustering: + # drop cluster_group + df_param = df_param.drop(columns=['cluster_group']) + else: + print(f'-----> Created in total {total_clusters} clusters in {len(groups)} groups.') # logic to add variables not used as clustering predictors into df_centroids feature_list = self.config.sampling.toposub.clustering_features.keys() @@ -321,6 +399,8 @@ def extract_topo_cluster_param(self): n_digits = len(str(self.toposub.df_centroids.index.max())) self.toposub.df_centroids['point_id'] = self.toposub.df_centroids.index.astype(int).astype(str).str.zfill( n_digits) + + # Build the final cluster map self.toposub.ds_param['cluster_labels'] = ( ["y", "x"], np.reshape(df_param.cluster_labels.values, self.toposub.ds_param.slope.shape)) @@ -333,7 +413,7 @@ def extract_topo_cluster_param(self): def extract_topo_param(self): """ - Function to select which + Function to select which """ if os.path.isfile(self.config.project.directory + 'outputs/' + self.config.outputs.file.df_centroids): self.toposub.df_centroids = pd.read_pickle( @@ -512,19 +592,6 @@ def downscale_climate(self): for file in flist: os.remove(file) - # concatenate ds solar - print('concatenating solar files') - out_solar_name = Path(self.config.outputs.file.ds_solar) - solar_pattern = f'{out_solar_name.stem}_*{out_solar_name.suffix}' - solar_flist = sorted(self.config.outputs.path.glob(solar_pattern)) - ds_solar_list = [xr.open_dataset(file, engine='h5netcdf') for file in solar_flist] - fout = Path(self.config.outputs.path, out_solar_name) - ds = xr.concat(ds_solar_list, dim='time') - ds.to_netcdf(fout, engine='h5netcdf') - [f.unlink() for f in solar_flist] # delete tmp solar files - print('solar files concatenated') - del ds - else: ta.downscale_climate(self.config.project.directory, self.config.climate.path,