diff --git a/clisops/utils/testing.py b/clisops/utils/testing.py index 9efef348..c7caf3c7 100644 --- a/clisops/utils/testing.py +++ b/clisops/utils/testing.py @@ -1,22 +1,63 @@ +import importlib.resources as ilr import os +import warnings from pathlib import Path -from typing import Union +from shutil import copytree +from sys import platform +from typing import Optional, Union +from urllib.error import HTTPError, URLError +from urllib.parse import urlparse +from urllib.request import urlretrieve +from filelock import FileLock from jinja2 import Template -from platformdirs import user_cache_dir +from loguru import logger +from xarray import Dataset +from xarray import open_dataset as _open_dataset + +try: + import pooch +except ImportError: + warnings.warn( + "The `pooch` library is not installed. " + "The default cache directory for testing data will not be set." + ) + pooch = None __all__ = [ - "MINI_ESGF_CACHE_DIR", "ContextLogger", - "write_roocs_cfg", + "ESGF_TEST_DATA_CACHE_DIR", + "ESGF_TEST_DATA_REPO_URL", + "XCLIM_TEST_DATA_CACHE_DIR", + "XCLIM_TEST_DATA_REPO_URL", + "default_mini_esgf_cache", + "default_xclim_testdata_cache", "get_esgf_file_paths", + "load_registry", + "open_dataset", + "stratus", + "write_roocs_cfg", ] -MINI_ESGF_CACHE_DIR = user_cache_dir(".mini-esgf-data") +try: + default_mini_esgf_cache = pooch.os_cache("mini-esgf-data") + default_xclim_testdata_cache = pooch.os_cache("xclim-testdata") +except AttributeError: + default_mini_esgf_cache = None + default_xclim_testdata_cache = None +ESGF_TEST_DATA_REPO_URL = "https://github.com/roocs/mini-esgf-data" +ESGF_TEST_DATA_VERSION = "master" +ESGF_TEST_DATA_CACHE_DIR = os.getenv("XCLIM_TESTDATA_CACHE", default_mini_esgf_cache) + +XCLIM_TEST_DATA_REPO_URL = "https://github.com/Ouranosinc/xclim-testdata" +XCLIM_TEST_DATA_VERSION = "v2024.8.23" +XCLIM_TEST_DATA_CACHE_DIR = os.getenv( + "XCLIM_TESTDATA_CACHE", default_xclim_testdata_cache +) -def write_roocs_cfg(cache_dir: Union[str, Path]): +def write_roocs_cfg(cache_dir: Union[str, Path]): cfg_templ = """ [project:cmip5] base_dir = {{ base_dir }}/test_data/badc/cmip5/data/cmip5 @@ -36,15 +77,15 @@ def write_roocs_cfg(cache_dir: Union[str, Path]): [project:c3s-cordex] base_dir = {{ base_dir }}/test_data/pool/data/CORDEX/data/cordex """ - roocs_config = Path(cache_dir, "roocs.ini").as_posix() + roocs_config = Path(cache_dir, "roocs.ini") cfg = Template(cfg_templ).render( - base_dir=Path(MINI_ESGF_CACHE_DIR).joinpath("master") + base_dir=Path(ESGF_TEST_DATA_CACHE_DIR).joinpath(ESGF_TEST_DATA_VERSION) ) with open(roocs_config, "w") as fp: fp.write(cfg) # point to roocs cfg in environment - os.environ["ROOCS_CONFIG"] = roocs_config + os.environ["ROOCS_CONFIG"] = roocs_config.as_posix() def get_esgf_file_paths(esgf_cache_dir): @@ -264,3 +305,275 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.logger.remove() except ValueError: pass + + +def load_registry(branch: str, repo: str): + """Load the registry file for the test data. + + Returns + ------- + dict + Dictionary of filenames and hashes. + """ + remote_registry = audit_url(f"{repo}/raw/{branch}/data/registry.txt") + + if repo == ESGF_TEST_DATA_REPO_URL: + project = "mini-esgf-data" + default_testdata_version = ESGF_TEST_DATA_VERSION + default_testdata_repo_url = ESGF_TEST_DATA_REPO_URL + elif repo == XCLIM_TEST_DATA_REPO_URL: + project = "xclim-testdata" + default_testdata_version = XCLIM_TEST_DATA_VERSION + default_testdata_repo_url = XCLIM_TEST_DATA_REPO_URL + else: + raise ValueError( + f"Repository URL {repo} not recognized. " + f"Please use one of {ESGF_TEST_DATA_REPO_URL} or {XCLIM_TEST_DATA_REPO_URL}" + ) + + if branch != default_testdata_version: + custom_registry_folder = Path( + str(ilr.files("clisops").joinpath(f"utils/registries/{branch}")) + ) + custom_registry_folder.mkdir(parents=True, exist_ok=True) + registry_file = custom_registry_folder.joinpath(f"{project}_registry.txt") + urlretrieve(remote_registry, registry_file) # noqa: S310 + + elif repo != default_testdata_repo_url: + registry_file = Path( + str(ilr.files("clisops").joinpath(f"utils/{project}_registry.txt")) + ) + urlretrieve(remote_registry, registry_file) # noqa: S310 + + registry_file = Path( + str(ilr.files("clisops").joinpath(f"utils/{project}_registry.txt")) + ) + if not registry_file.exists(): + raise FileNotFoundError(f"Registry file not found: {registry_file}") + + # Load the registry file + with registry_file.open() as f: + registry = {line.split()[0]: line.split()[1] for line in f} + return registry + + +def stratus( # noqa: PR01 + repo: str, + branch: str, + cache_dir: Union[str, Path], + data_updates: bool = True, +): + """Pooch registry instance for xclim test data. + + Parameters + ---------- + repo : str + URL of the repository to use when fetching testing datasets. + branch : str + Branch of repository to use when fetching testing datasets. + cache_dir : str or Path + The path to the directory where the data files are stored. + data_updates : bool + If True, allow updates to the data files. Default is True. + + Returns + ------- + pooch.Pooch + The Pooch instance for accessing the testing data. + + Examples + -------- + Using the registry to download a file: + + .. code-block:: python + + import xarray as xr + from clisops.utils.testing import stratus + + s = stratus(data_dir=..., repo=..., branch=...) + example_file = s.fetch("example.nc") + data = xr.open_dataset(example_file) + """ + if pooch is None: + raise ImportError( + "The `pooch` package is required to fetch the xclim testing data. " + "You can install it with `pip install pooch` or `pip install clisops[dev]`." + ) + + if repo.endswith("xclim-testdata"): + _version = XCLIM_TEST_DATA_VERSION + elif repo.endswith("mini-esgf-data"): + _version = ESGF_TEST_DATA_VERSION + else: + raise ValueError( + f"Repository URL {repo} not recognized. " + f"Please use one of {ESGF_TEST_DATA_REPO_URL} or {XCLIM_TEST_DATA_REPO_URL}" + ) + + remote = audit_url(f"{repo}/raw/{branch}/data") + return pooch.create( + path=cache_dir, + base_url=remote, + version=_version, + version_dev=branch, + allow_updates=data_updates, + registry=load_registry(branch=branch, repo=repo), + ) + + +# idea copied from raven that it borrowed from xclim that borrowed it from xarray that was borrowed from Seaborn +def open_dataset( + name: Union[str, os.PathLike[str]], + branch: str, + repo: str, + cache_dir: Union[str, os.PathLike[str]], + **kwargs, +) -> Dataset: + r"""Open a dataset from the online GitHub-like repository. + + If a local copy is found then always use that to avoid network traffic. + + Parameters + ---------- + name : str + Name of the file containing the dataset. + branch : str + Branch of the repository to use when fetching datasets. + repo: str + URL of the repository to use when fetching testing datasets. + cache_dir : Path + The directory in which to search for and write cached data. + \*\*kwargs + For NetCDF files, keywords passed to :py:func:`xarray.open_dataset`. + + Returns + ------- + Union[Dataset, Path] + + Raises + ------ + OSError + If the file is not found in the cache directory or cannot be read. + + See Also + -------- + xarray.open_dataset + """ + if cache_dir is None: + raise ValueError( + "The cache directory must be set. " + "Please set the `cache_dir` parameter or the `XCLIM_DATA_DIR` environment variable." + ) + + local_file = Path(cache_dir).joinpath(name) + if not local_file.exists(): + try: + local_file = stratus(branch=branch, repo=repo, cache_dir=cache_dir).fetch( + name + ) + except OSError as e: + raise OSError( + f"File not found locally. Verify that the testing data is available in remote: {local_file}" + ) from e + try: + ds = _open_dataset(local_file, **kwargs) + return ds + except OSError: + raise + + +def populate_testing_data( + repo: str, + branch: str, + cache_dir: Path, +) -> None: + """Populate the local cache with the testing data. + + Parameters + ---------- + repo : str, optional + URL of the repository to use when fetching testing datasets. + branch : str, optional + Branch of xclim-testdata to use when fetching testing datasets. + cache_dir : Path + The path to the local cache. Defaults to the location set by the platformdirs library. + The testing data will be downloaded to this local cache. + + Returns + ------- + None + """ + # Create the Pooch instance + n = stratus(cache_dir=cache_dir, repo=repo, branch=branch) + + # Download the files + errored_files = [] + for file in load_registry(branch=branch, repo=repo): + try: + n.fetch(file) + except HTTPError: + msg = f"File `{file}` not accessible in remote repository." + logger.error(msg) + errored_files.append(file) + else: + logger.info("Files were downloaded successfully.") + + if errored_files: + logger.error( + "The following files were unable to be downloaded: %s", + errored_files, + ) + + +def gather_testing_data( + worker_cache_dir: Union[str, os.PathLike[str], Path], + worker_id: str, + branch: str, + repo: str, + cache_dir: Union[str, os.PathLike[str], Path], +): + """Gather testing data across workers.""" + cache_dir = Path(cache_dir) + + if worker_id == "master": + populate_testing_data(branch=branch, repo=repo, cache_dir=cache_dir) + else: + if platform == "win32": + if not cache_dir.joinpath(branch).exists(): + raise FileNotFoundError( + "Testing data not found and UNIX-style file-locking is not supported on Windows. " + "Consider running `populate_testing_data()` to download testing data beforehand." + ) + else: + cache_dir.mkdir(exist_ok=True, parents=True) + lockfile = cache_dir.joinpath(".lock") + test_data_being_written = FileLock(lockfile) + with test_data_being_written: + # This flag prevents multiple calls from re-attempting to download testing data in the same pytest run + populate_testing_data(branch=branch, repo=repo, cache_dir=cache_dir) + cache_dir.joinpath(".data_written").touch() + with test_data_being_written.acquire(): + if lockfile.exists(): + lockfile.unlink() + copytree(cache_dir.joinpath(branch), worker_cache_dir) + + +def audit_url(url: str, context: Optional[str] = None) -> str: + """Check if the URL is well-formed. + + Raises + ------ + URLError + If the URL is not well-formed. + """ + msg = "" + result = urlparse(url) + if result.scheme == "http": + msg = f"{context if context else ''} URL is not using secure HTTP: '{url}'".strip() + if not all([result.scheme, result.netloc]): + msg = f"{context if context else ''} URL is not well-formed: '{url}'".strip() + + if msg: + logger.error(msg) + raise URLError(msg) + return url diff --git a/clisops/utils/tutorial.py b/clisops/utils/tutorial.py deleted file mode 100644 index 209d945a..00000000 --- a/clisops/utils/tutorial.py +++ /dev/null @@ -1,242 +0,0 @@ -"""Testing and tutorial utilities module.""" - -# Most of this code copied and adapted from xarray, xclim, and raven -import hashlib -import re -from pathlib import Path -from typing import List, Optional, Sequence, Union -from urllib.error import HTTPError -from urllib.parse import urljoin -from urllib.request import urlretrieve - -import platformdirs -import requests -from loguru import logger -from xarray import Dataset -from xarray import open_dataset as _open_dataset - -_default_cache_dir = Path(platformdirs.user_cache_dir("clisops", "roocs")) - - -__all__ = ["get_file", "open_dataset", "query_folder"] - - -def file_md5_checksum(fname): - hash_md5 = hashlib.md5() - with open(fname, "rb") as f: - hash_md5.update(f.read()) - return hash_md5.hexdigest() - - -def _get( - fullname: Path, - github_url: str, - branch: str, - suffix: str, - cache_dir: Path, -) -> Path: - cache_dir = cache_dir.absolute() - local_file = cache_dir / branch / fullname - md5name = fullname.with_suffix(f"{suffix}.md5") - md5file = cache_dir / branch / md5name - - if not local_file.is_file(): - # This will always leave this directory on disk. - # We may want to add an option to remove it. - local_file.parent.mkdir(parents=True, exist_ok=True) - - url = "/".join((github_url, "raw", branch, fullname.as_posix())) - logger.info("Fetching remote file: %s" % fullname.as_posix()) - urlretrieve(url, local_file) - - try: - url = "/".join((github_url, "raw", branch, md5name.as_posix())) - logger.info("Fetching remote file md5: %s" % md5name.as_posix()) - urlretrieve(url, md5file) - except HTTPError as e: - msg = f"{md5name.as_posix()} not found. Aborting file retrieval." - local_file.unlink() - raise FileNotFoundError(msg) from e - - localmd5 = file_md5_checksum(local_file) - - try: - with open(md5file) as f: - remotemd5 = f.read() - if localmd5.strip() != remotemd5.strip(): - local_file.unlink() - msg = """ - MD5 checksum does not match, try downloading dataset again. - """ - raise OSError(msg) - except OSError as e: - logger.error(e) - return local_file - - -# idea copied from xclim that borrowed it from xarray that was borrowed from Seaborn -def get_file( - name: Union[str, Sequence[str]], - github_url: str = "https://github.com/Ouranosinc/xclim-testdata", - branch: str = "main", - cache_dir: Path = _default_cache_dir, -) -> Union[Path, List[Path]]: - """ - Return a file from an online GitHub-like repository. - If a local copy is found then always use that to avoid network traffic. - - Parameters - ---------- - name : Union[str, Sequence[str]] - Name of the file or list/tuple of names of files containing the dataset(s) including suffixes. - github_url : str - URL to GitHub repository where the data is stored. - branch : str, optional - For GitHub-hosted files, the branch to download from. - cache_dir : Path - The directory in which to search for and write cached data. - - Returns - ------- - Union[Path, List[Path]] - """ - if isinstance(name, str): - name = [name] - - files = list() - for n in name: - fullname = Path(n) - suffix = fullname.suffix - files.append( - _get( - fullname=fullname, - github_url=github_url, - branch=branch, - suffix=suffix, - cache_dir=cache_dir, - ) - ) - if len(files) == 1: - return files[0] - return files - - -# Credits to Anselme https://stackoverflow.com/a/62003257/7322852 (CC-BY-SA 4.0) -def query_folder( - folder: Optional[str] = None, - pattern: Optional[str] = None, - github_url: str = "https://github.com/Ouranosinc/xclim-testdata", - branch: str = "main", -) -> List[str]: - """ - Lists the files available for retrieval from a remote git repository with get_file. - If provided a folder name, will perform a globbing-like filtering operation for parent folders. - - Parameters - ---------- - folder : str, optional - Relative pathname of the sub-folder from the top-level. - pattern : str, optional - Regex pattern to identify a file. - github_url : str - URL to GitHub repository where the data is stored. - branch : str, optional - For GitHub-hosted files, the branch to download from. - - Returns - ------- - List[str] - """ - repo_name = github_url.strip("https://github.com/") - - url = f"https://api.github.com/repos/{repo_name}/git/trees/{branch}?recursive=1" - r = requests.get(url) - res = r.json() - - md5_files = [f["path"] for f in res["tree"] if f["path"].endswith(".md5")] - if folder: - folder = "/".join("/".split(folder)) if "/" in folder else folder - md5_files = [f for f in md5_files if folder in Path(f).parent.as_posix()] - files = [re.sub(".md5$", "", f) for f in md5_files] - - if pattern: - regex = re.compile(pattern) - files = [string for string in files if re.search(regex, string)] - - return files - - -# idea copied from xclim that borrowed it from xarray that was borrowed from Seaborn -def open_dataset( - name: str, - suffix: Optional[str] = None, - dap_url: Optional[str] = None, - github_url: str = "https://github.com/Ouranosinc/xclim-testdata", - branch: str = "main", - cache: bool = True, - cache_dir: Path = _default_cache_dir, - **kwds, -) -> Dataset: - """ - Open a dataset from the online GitHub-like repository. - If a local copy is found then always use that to avoid network traffic. - - Parameters - ---------- - name : str - Name of the file containing the dataset. If no suffix is given, assumed - to be netCDF ('.nc' is appended). - suffix : str, optional - If no suffix is given, assumed to be netCDF ('.nc' is appended). For no suffix, set "". - dap_url : str, optional - URL to OPeNDAP folder where the data is stored. If supplied, supersedes github_url. - github_url : str - URL to GitHub repository where the data is stored. - branch : str, optional - For GitHub-hosted files, the branch to download from. - cache_dir : Path - The directory in which to search for and write cached data. - cache : bool - If True, then cache data locally for use on subsequent calls. - kwds : dict, optional - For NetCDF files, **kwds passed to xarray.open_dataset. - - Returns - ------- - Union[Dataset, Path] - - See Also - -------- - xarray.open_dataset - """ - name = Path(name) - if suffix is None: - suffix = ".nc" - fullname = name.with_suffix(suffix) - - if dap_url is not None: - dap_file = urljoin(dap_url, str(name)) - try: - ds = _open_dataset(dap_file, **kwds) - return ds - except OSError: - msg = "OPeNDAP file not read. Verify that service is available." - logger.error(msg) - raise - - local_file = _get( - fullname=fullname, - github_url=github_url, - branch=branch, - suffix=suffix, - cache_dir=cache_dir, - ) - - try: - ds = _open_dataset(local_file, **kwds) - if not cache: - ds = ds.load() - local_file.unlink() - return ds - except OSError: - raise diff --git a/tests/conftest.py b/tests/conftest.py index 4c1beefe..ee04e520 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,15 +11,9 @@ from filelock import FileLock from git import Repo -from clisops.utils import get_file as _get_file -from clisops.utils import open_dataset as _open_dataset -from clisops.utils.testing import MINI_ESGF_CACHE_DIR, get_esgf_file_paths - -ESGF_TEST_DATA_REPO_URL = "https://github.com/roocs/mini-esgf-data" -ESGF_TEST_DATA_VERSION = "master" - -XCLIM_TEST_DATA_REPO_URL = "https://github.com/Ouranosinc/xclim-testdata" -XCLIM_TEST_DATA_VERSION = "v2023.12.14" +from clisops.utils import testing +from clisops.utils.testing import open_dataset as _open_dataset +from clisops.utils.testing import stratus as _stratus REAL_C3S_CMIP5_ARCHIVE_BASE = "/gws/nopw/j04/cp4cds1_vol1/data/" @@ -263,34 +257,59 @@ def _ps_series(values, start="7/1/2000"): @pytest.fixture(scope="session", autouse=True) def threadsafe_data_dir(tmp_path_factory): - yield tmp_path_factory.getbasetemp().joinpath("data") + return tmp_path_factory.getbasetemp().joinpath("data") + + +@pytest.fixture(scope="session") +def stratus(threadsafe_data_dir, worker_id): + return _stratus( + repo=testing.ESGF_TEST_DATA_REPO_URL, + branch=testing.ESGF_TEST_DATA_VERSION, + cache_dir=( + testing.ESGF_TEST_DATA_CACHE_DIR + if worker_id == "master" + else threadsafe_data_dir + ), + ) -@pytest.fixture(scope="session", autouse=True) -def get_file(threadsafe_data_dir): - def _get_session_scoped_file(file: str, branch: str = XCLIM_TEST_DATA_VERSION): - return _get_file( +@pytest.fixture(scope="session") +def nimbus(threadsafe_data_dir, worker_id): + return _stratus( + repo=testing.XCLIM_TEST_DATA_REPO_URL, + branch=testing.XCLIM_TEST_DATA_VERSION, + cache_dir=( + testing.XCLIM_TEST_DATA_CACHE_DIR + if worker_id == "master" + else threadsafe_data_dir + ), + ) + + +@pytest.fixture(scope="session") +def open_esgf_dataset(stratus): + def _open_session_scoped_file(file: Union[str, os.PathLike], **xr_kwargs): + xr_kwargs.setdefault("cache", True) + return _open_dataset( file, - github_url=XCLIM_TEST_DATA_REPO_URL, - cache_dir=threadsafe_data_dir, - branch=branch, + branch=testing.ESGF_TEST_DATA_VERSION, + repo=testing.ESGF_TEST_DATA_REPO_URL, + cache_dir=stratus.path, + **xr_kwargs, ) - return _get_session_scoped_file + return _open_session_scoped_file -@pytest.fixture(scope="session", autouse=True) -def open_dataset(threadsafe_data_dir): - def _open_session_scoped_file( - file: Union[str, os.PathLike], - branch: str = XCLIM_TEST_DATA_VERSION, - **xr_kwargs, - ): +@pytest.fixture(scope="session") +def open_xclim_dataset(nimbus): + def _open_session_scoped_file(file: Union[str, os.PathLike], **xr_kwargs): + xr_kwargs.setdefault("cache", True) return _open_dataset( file, - github_url=XCLIM_TEST_DATA_REPO_URL, - cache_dir=threadsafe_data_dir, - branch=branch, + branch=testing.XCLIM_TEST_DATA_VERSION, + repo=testing.XCLIM_TEST_DATA_REPO_URL, + cache_dir=nimbus.path, **xr_kwargs, ) @@ -311,41 +330,42 @@ def _check_output_nc(result, fname="output_001.nc", time=None): return _check_output_nc -# Fixture to load mini-esgf-data repository used by roocs tests @pytest.fixture(scope="session", autouse=True) def load_esgf_test_data(worker_id): """ This fixture ensures that the required test data repository has been cloned to the cache directory within the home directory. """ - target = Path(MINI_ESGF_CACHE_DIR).joinpath(ESGF_TEST_DATA_VERSION) + target = Path(testing.ESGF_TEST_DATA_CACHE_DIR).joinpath( + testing.ESGF_TEST_DATA_VERSION + ) if not target.exists(): if (platform == "win32" and worker_id == "gw0") or worker_id == "master": if not target.is_dir(): - Path(MINI_ESGF_CACHE_DIR).mkdir(exist_ok=True) - repo = Repo.clone_from(ESGF_TEST_DATA_REPO_URL, target) - repo.git.checkout(ESGF_TEST_DATA_VERSION) + Path(testing.ESGF_TEST_DATA_CACHE_DIR).mkdir(exist_ok=True) + repo = Repo.clone_from(testing.ESGF_TEST_DATA_REPO_URL, target) + repo.git.checkout(testing.ESGF_TEST_DATA_VERSION) elif ( os.environ.get("ROOCS_AUTO_UPDATE_TEST_DATA", "true").lower() != "false" ): repo = Repo(target) - repo.git.checkout(ESGF_TEST_DATA_VERSION) + repo.git.checkout(testing.ESGF_TEST_DATA_REPO_URL) repo.remotes[0].pull() else: - lockfile = Path(MINI_ESGF_CACHE_DIR).joinpath(".lock") + lockfile = Path(testing.ESGF_TEST_DATA_CACHE_DIR).joinpath(".lock") test_data_being_written = FileLock(lockfile) with test_data_being_written: if not target.is_dir(): - repo = Repo.clone_from(ESGF_TEST_DATA_REPO_URL, target) - repo.git.checkout(ESGF_TEST_DATA_VERSION) + repo = Repo.clone_from(testing.ESGF_TEST_DATA_REPO_URL, target) + repo.git.checkout(testing.ESGF_TEST_DATA_VERSION) elif ( os.environ.get("ROOCS_AUTO_UPDATE_TEST_DATA", "true").lower() != "false" ): repo = Repo(target) - repo.git.checkout(ESGF_TEST_DATA_VERSION) + repo.git.checkout(testing.ESGF_TEST_DATA_VERSION) repo.remotes[0].pull() target.joinpath(".data_written").touch() @@ -399,8 +419,8 @@ def cmip6_archive_base(): @pytest.fixture(scope="session", autouse=True) def mini_esgf_data(): - return get_esgf_file_paths( - Path(MINI_ESGF_CACHE_DIR).joinpath(ESGF_TEST_DATA_VERSION) + return testing.get_esgf_file_paths( + Path(testing.ESGF_TEST_DATA_CACHE_DIR).joinpath(testing.ESGF_TEST_DATA_VERSION) )