Skip to content

Commit

Permalink
cache attr and ts of caravan in cache dir
Browse files Browse the repository at this point in the history
  • Loading branch information
OuyangWenyu committed Oct 19, 2023
1 parent e4c4702 commit f0b8657
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 13 deletions.
85 changes: 75 additions & 10 deletions hydrodataset/caravan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import numpy as np
from timezonefinder import TimezoneFinder
from hydroutils import hydro_file
from hydrodataset import HydroDataset
from hydrodataset import CACHE_DIR, HydroDataset


class Caravan(HydroDataset):
Expand Down Expand Up @@ -632,17 +632,82 @@ def cache_xrdataset(self, checkregion="hysets"):
warnings.warn("Check you units of all variables")
if self.region != "Global":
raise ValueError("Only Global region is supported.")
ds_attr = self.cache_attributes_xrdataset()
ds_attr.to_netcdf(
os.path.join(
self.data_source_description["ATTR_DIR"],
"caravan_attributes.nc",
)
cache_attr_file = os.path.join(
CACHE_DIR,
"caravan_attributes.nc",
)
if not os.path.isfile(cache_attr_file):
ds_attr = self.cache_attributes_xrdataset()
ds_attr.to_netcdf(cache_attr_file)

if checkregion is not None:
regions = [checkregion]
else:
regions = self.region_data_name
if checkregion == "all":
regions = self.region_data_name
else:
regions = [checkregion]
self._check_data(regions)

for region in self.region_data_name:
region_ts_file = CACHE_DIR.joinpath(f"caravan{region}_timeseries.nc")
if os.path.isfile(region_ts_file):
continue

# all files are too large to read in memory, hence we read them region by region
site_file = os.path.join(
self.data_source_description["ATTR_DIR"],
region,
"attributes_caravan_" + region + ".csv",
)
sites_region = pd.read_csv(site_file, sep=",")
gage_id_lst = sites_region["gauge_id"].values

# forcing dir is same as flow dir
ts_dir = self.data_source_description["FORCING_DIR"]

# Find matching file paths
file_paths = []
for file_name in gage_id_lst:
file_path = os.path.join(ts_dir, region, file_name) + ".nc"
if os.path.isfile(file_path):
file_paths.append(file_path)

if len(file_paths) > 1000:
# hysets is too large; split them into some parts
split_num = 5
file_path_lsts = np.array_split(file_paths, split_num)
gage_id_lsts = np.array_split(gage_id_lst, split_num)
for i in range(len(file_path_lsts)):
save_file = CACHE_DIR.joinpath(f"caravan{region}_timeseries_{i}.nc")
if os.path.isfile(save_file):
continue
datasets_i = [
xr.open_dataset(path, chunks={}).assign_coords(gauge_id=name)
for path, name in zip(file_path_lsts[i], gage_id_lsts[i])
]
# Concatenate the datasets along the new dimension
combined_ds_i = xr.concat(datasets_i, dim="gauge_id")
region_ts_file_i = CACHE_DIR.joinpath(
f"caravan{region}_timeseries_{i}.nc"
)
combined_ds_i.to_netcdf(
region_ts_file_i,
mode="w",
format="NETCDF4",
)
else:
datasets = [
xr.open_dataset(path, chunks={}).assign_coords(gauge_id=name)
for path, name in zip(file_paths, gage_id_lst)
]
# Concatenate the datasets along the new dimension
combined_ds = xr.concat(datasets, dim="gauge_id")
combined_ds.to_netcdf(
region_ts_file,
mode="w",
format="NETCDF4",
)

def _check_data(self, regions):
pbar = tqdm(regions, desc="Start Checking Data...")
for region in pbar:
pbar.set_description(f"Processing Region-{region}")
Expand Down
6 changes: 3 additions & 3 deletions tests/test_caravan.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""
Author: Wenyu Ouyang
Date: 2023-07-18 11:45:25
LastEditTime: 2023-10-18 21:42:28
LastEditTime: 2023-10-19 10:08:35
LastEditors: Wenyu Ouyang
Description: Test for caravan dataset reading
FilePath: \hydrodataset\tests\test_caravan.py
FilePath: /hydrodataset/tests/test_caravan.py
Copyright (c) 2023-2024 Wenyu Ouyang. All rights reserved.
"""
import os
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_read_caravan(caravan):


def test_cache_caravan(caravan):
caravan.cache_xrdataset()
caravan.cache_xrdataset(checkregion=None)


def test_read_ts_xrdataset(caravan):
Expand Down

0 comments on commit f0b8657

Please sign in to comment.