Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry #25

Merged
merged 13 commits into from
Jan 29, 2024
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:

- name: Test with pytest
run: pytest --cov uscrn --cov-report xml --cov-report term
timeout-minutes: 10

- name: Upload coverage to Codecov
if: ${{ matrix.python-version == '3.11' }}
Expand Down
37 changes: 37 additions & 0 deletions tests/test_attrs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import inspect
from contextlib import contextmanager
from pathlib import Path
from typing import get_args

import pytest

import uscrn
from uscrn.attrs import (
_ALL_WHICHS,
DEFAULT_WHICH,
WHICHS,
_get_docs,
_map_dtype,
expand_str,
expand_strs,
Expand Down Expand Up @@ -94,6 +98,39 @@ def test_load_attrs():
}


@contextmanager
def change_cache_dir(p: Path):
default = uscrn.attrs._CACHE_DIR
uscrn.attrs._CACHE_DIR = p
try:
yield
finally:
uscrn.attrs._CACHE_DIR = default


def test_get_docs_dl(tmp_path, caplog):
which = "daily"
d = tmp_path

p_headers = d / f"{which}_headers.txt"
p_readme = d / f"{which}_readme.txt"
assert not p_headers.exists()
assert not p_readme.exists()

with change_cache_dir(d), caplog.at_level("INFO"):
_get_docs(which)
assert "downloading" in caplog.text

assert p_headers.is_file()
assert p_readme.is_file()

# Now should load from disk instead of web
caplog.clear()
with change_cache_dir(d), caplog.at_level("INFO"):
_get_docs(which)
assert "downloading" not in caplog.text


def test_load_col_info():
get_col_info("subhourly")
get_col_info("daily")
Expand Down
23 changes: 23 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from uscrn._util import retry


def test_retry(caplog):
from requests.exceptions import ConnectionError

n = 0

@retry
def func():
nonlocal n
if n < 2:
n += 1
raise ConnectionError
return "result"

with caplog.at_level("INFO"):
res = func()

for r in caplog.records:
assert r.getMessage() == "Retrying func in 1 s after connection error"

assert res == "result"
40 changes: 40 additions & 0 deletions uscrn/_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

logger = logging.getLogger("uscrn")


def retry(func):
"""Decorator to retry a function on web connection error.
Up to 60 s, with Fibonacci backoff (1, 1, 2, 3, ...).
"""
import urllib
from functools import wraps

import requests

max_time = 60_000_000_000 # 60 s (in ns)

@wraps(func)
def wrapper(*args, **kwargs):
from time import perf_counter_ns, sleep

t0 = perf_counter_ns()
a, b = 1, 1
while True:
try:
return func(*args, **kwargs)
except (
urllib.error.URLError,
requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout,
):
if perf_counter_ns() - t0 > max_time: # pragma: no cover
raise
logger.info(
f"Retrying {func.__name__} in {a} s after connection error",
stacklevel=2,
)
sleep(a)
a, b = b, a + b # Fibonacci backoff

return wrapper
11 changes: 8 additions & 3 deletions uscrn/attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np

HERE = Path(__file__).parent
_CACHE_DIR = HERE / "cache"


def expand_str(s: str) -> list[str]:
Expand Down Expand Up @@ -289,15 +290,18 @@ def _get_docs(
import pandas as pd
import requests

from ._util import logger, retry

validate_which(which)

stored_attrs = load_attrs()
base_url = stored_attrs[which]["base_url"]

@retry
def get(url: str, fp: Path) -> str:
needs_update: bool
if fp.is_file():
r = requests.head(url)
r = requests.head(url, timeout=10)
r.raise_for_status()
last_modified_url = pd.Timestamp(r.headers["Last-Modified"])
last_modified_local = pd.Timestamp(fp.stat().st_mtime, unit="s", tz="UTC")
Expand All @@ -306,7 +310,8 @@ def get(url: str, fp: Path) -> str:
needs_update = True

if needs_update:
r = requests.get(url)
logger.info(f"downloading {url} to {fp}")
r = requests.get(url, timeout=10)
r.raise_for_status()
with open(fp, "w") as f:
f.write(r.text)
Expand All @@ -316,7 +321,7 @@ def get(url: str, fp: Path) -> str:

return text

cache_dir = HERE / "cache"
cache_dir = _CACHE_DIR
cache_dir.mkdir(exist_ok=True)

headers_txt = get(f"{base_url}/headers.txt", cache_dir / f"{which}_headers.txt")
Expand Down
26 changes: 21 additions & 5 deletions uscrn/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import pandas as pd
import xarray as xr

from ._util import retry

_GET_CAP: int | None = None
"""Restrict how many files to load, for testing purposes."""


@retry
def load_meta(*, cat: bool = False) -> pd.DataFrame:
"""Load the station metadata table.

Expand Down Expand Up @@ -98,6 +101,7 @@ def parse_url(url: str) -> _ParseRes:
return parse_fp(urlsplit(url).path)


@retry
def read_subhourly(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read a subhourly USCRN file.

Expand Down Expand Up @@ -148,6 +152,7 @@ def read_subhourly(fp, *, cat: bool = False) -> pd.DataFrame:
return df


@retry
def read_hourly(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read an hourly USCRN file.

Expand Down Expand Up @@ -194,6 +199,7 @@ def read_hourly(fp, *, cat: bool = False) -> pd.DataFrame:
return df


@retry
def read_daily(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read a daily USCRN file.

Expand Down Expand Up @@ -236,6 +242,7 @@ def read_daily(fp, *, cat: bool = False) -> pd.DataFrame:
return df


@retry
def read_monthly(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read a monthly USCRN file.

Expand Down Expand Up @@ -367,19 +374,27 @@ def get_data(

# Get available years from the main page
# e.g. `>2000/<`
# TODO: could cache available years and files like the docs pages
print("Discovering files...")
r = requests.get(f"{base_url}/")
r.raise_for_status()

@retry
def get_main_list_page():
r = requests.get(f"{base_url}/", timeout=10)
r.raise_for_status()
return r.text

main_list_page = get_main_list_page()

urls: list[str]
if which == "monthly":
# No year subdirectories
fns = re.findall(r">(CRN[a-zA-Z0-9\-_]*\.txt)<", r.text)
fns = re.findall(r">(CRN[a-zA-Z0-9\-_]*\.txt)<", main_list_page)
urls = [f"{base_url}/{fn}" for fn in fns]
else:
# Year subdirectories
from multiprocessing.pool import ThreadPool

available_years: list[int] = [int(s) for s in re.findall(r">([0-9]{4})/?<", r.text)]
available_years: list[int] = [int(s) for s in re.findall(r">([0-9]{4})/?<", main_list_page)]

years_: list[int]
if isinstance(years, int):
Expand All @@ -391,6 +406,7 @@ def get_data(
if len(years_) == 0:
raise ValueError("years should not be empty")

@retry
def get_year_urls(year):
if year not in available_years:
raise ValueError(
Expand All @@ -400,7 +416,7 @@ def get_year_urls(year):
# Get filenames from the year page
# e.g. `>CRND0103-2020-TX_Palestine_6_WNW.txt<`
url = f"{base_url}/{year}/"
r = requests.get(url)
r = requests.get(url, timeout=10)
r.raise_for_status()
fns = re.findall(r">(CRN[a-zA-Z0-9\-_]*\.txt)<", r.text)
if not fns: # pragma: no cover
Expand Down
Loading