Skip to content

Commit

Permalink
Merge pull request #25 from zmoon/retry
Browse files Browse the repository at this point in the history
Retry
  • Loading branch information
zmoon authored Jan 29, 2024
2 parents edd5466 + c8ced8e commit db37121
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 8 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
- name: Test with pytest
run: pytest --cov uscrn --cov-report xml --cov-report term
timeout-minutes: 10

- name: Upload coverage to Codecov
if: ${{ matrix.python-version == '3.11' }}
Expand Down
37 changes: 37 additions & 0 deletions tests/test_attrs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import inspect
from contextlib import contextmanager
from pathlib import Path
from typing import get_args

import pytest

import uscrn
from uscrn.attrs import (
_ALL_WHICHS,
DEFAULT_WHICH,
WHICHS,
_get_docs,
_map_dtype,
expand_str,
expand_strs,
Expand Down Expand Up @@ -94,6 +98,39 @@ def test_load_attrs():
}


@contextmanager
def change_cache_dir(p: Path):
default = uscrn.attrs._CACHE_DIR
uscrn.attrs._CACHE_DIR = p
try:
yield
finally:
uscrn.attrs._CACHE_DIR = default


def test_get_docs_dl(tmp_path, caplog):
which = "daily"
d = tmp_path

p_headers = d / f"{which}_headers.txt"
p_readme = d / f"{which}_readme.txt"
assert not p_headers.exists()
assert not p_readme.exists()

with change_cache_dir(d), caplog.at_level("INFO"):
_get_docs(which)
assert "downloading" in caplog.text

assert p_headers.is_file()
assert p_readme.is_file()

# Now should load from disk instead of web
caplog.clear()
with change_cache_dir(d), caplog.at_level("INFO"):
_get_docs(which)
assert "downloading" not in caplog.text


def test_load_col_info():
get_col_info("subhourly")
get_col_info("daily")
Expand Down
23 changes: 23 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from uscrn._util import retry


def test_retry(caplog):
from requests.exceptions import ConnectionError

n = 0

@retry
def func():
nonlocal n
if n < 2:
n += 1
raise ConnectionError
return "result"

with caplog.at_level("INFO"):
res = func()

for r in caplog.records:
assert r.getMessage() == "Retrying func in 1 s after connection error"

assert res == "result"
40 changes: 40 additions & 0 deletions uscrn/_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging

logger = logging.getLogger("uscrn")


def retry(func):
"""Decorator to retry a function on web connection error.
Up to 60 s, with Fibonacci backoff (1, 1, 2, 3, ...).
"""
import urllib
from functools import wraps

import requests

max_time = 60_000_000_000 # 60 s (in ns)

@wraps(func)
def wrapper(*args, **kwargs):
from time import perf_counter_ns, sleep

t0 = perf_counter_ns()
a, b = 1, 1
while True:
try:
return func(*args, **kwargs)
except (
urllib.error.URLError,
requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout,
):
if perf_counter_ns() - t0 > max_time: # pragma: no cover
raise
logger.info(
f"Retrying {func.__name__} in {a} s after connection error",
stacklevel=2,
)
sleep(a)
a, b = b, a + b # Fibonacci backoff

return wrapper
11 changes: 8 additions & 3 deletions uscrn/attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np

HERE = Path(__file__).parent
_CACHE_DIR = HERE / "cache"


def expand_str(s: str) -> list[str]:
Expand Down Expand Up @@ -289,15 +290,18 @@ def _get_docs(
import pandas as pd
import requests

from ._util import logger, retry

validate_which(which)

stored_attrs = load_attrs()
base_url = stored_attrs[which]["base_url"]

@retry
def get(url: str, fp: Path) -> str:
needs_update: bool
if fp.is_file():
r = requests.head(url)
r = requests.head(url, timeout=10)
r.raise_for_status()
last_modified_url = pd.Timestamp(r.headers["Last-Modified"])
last_modified_local = pd.Timestamp(fp.stat().st_mtime, unit="s", tz="UTC")
Expand All @@ -306,7 +310,8 @@ def get(url: str, fp: Path) -> str:
needs_update = True

if needs_update:
r = requests.get(url)
logger.info(f"downloading {url} to {fp}")
r = requests.get(url, timeout=10)
r.raise_for_status()
with open(fp, "w") as f:
f.write(r.text)
Expand All @@ -316,7 +321,7 @@ def get(url: str, fp: Path) -> str:

return text

cache_dir = HERE / "cache"
cache_dir = _CACHE_DIR
cache_dir.mkdir(exist_ok=True)

headers_txt = get(f"{base_url}/headers.txt", cache_dir / f"{which}_headers.txt")
Expand Down
26 changes: 21 additions & 5 deletions uscrn/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import pandas as pd
import xarray as xr

from ._util import retry

_GET_CAP: int | None = None
"""Restrict how many files to load, for testing purposes."""


@retry
def load_meta(*, cat: bool = False) -> pd.DataFrame:
"""Load the station metadata table.
Expand Down Expand Up @@ -98,6 +101,7 @@ def parse_url(url: str) -> _ParseRes:
return parse_fp(urlsplit(url).path)


@retry
def read_subhourly(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read a subhourly USCRN file.
Expand Down Expand Up @@ -148,6 +152,7 @@ def read_subhourly(fp, *, cat: bool = False) -> pd.DataFrame:
return df


@retry
def read_hourly(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read an hourly USCRN file.
Expand Down Expand Up @@ -194,6 +199,7 @@ def read_hourly(fp, *, cat: bool = False) -> pd.DataFrame:
return df


@retry
def read_daily(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read a daily USCRN file.
Expand Down Expand Up @@ -236,6 +242,7 @@ def read_daily(fp, *, cat: bool = False) -> pd.DataFrame:
return df


@retry
def read_monthly(fp, *, cat: bool = False) -> pd.DataFrame:
"""Read a monthly USCRN file.
Expand Down Expand Up @@ -367,19 +374,27 @@ def get_data(

# Get available years from the main page
# e.g. `>2000/<`
# TODO: could cache available years and files like the docs pages
print("Discovering files...")
r = requests.get(f"{base_url}/")
r.raise_for_status()

@retry
def get_main_list_page():
r = requests.get(f"{base_url}/", timeout=10)
r.raise_for_status()
return r.text

main_list_page = get_main_list_page()

urls: list[str]
if which == "monthly":
# No year subdirectories
fns = re.findall(r">(CRN[a-zA-Z0-9\-_]*\.txt)<", r.text)
fns = re.findall(r">(CRN[a-zA-Z0-9\-_]*\.txt)<", main_list_page)
urls = [f"{base_url}/{fn}" for fn in fns]
else:
# Year subdirectories
from multiprocessing.pool import ThreadPool

available_years: list[int] = [int(s) for s in re.findall(r">([0-9]{4})/?<", r.text)]
available_years: list[int] = [int(s) for s in re.findall(r">([0-9]{4})/?<", main_list_page)]

years_: list[int]
if isinstance(years, int):
Expand All @@ -391,6 +406,7 @@ def get_data(
if len(years_) == 0:
raise ValueError("years should not be empty")

@retry
def get_year_urls(year):
if year not in available_years:
raise ValueError(
Expand All @@ -400,7 +416,7 @@ def get_year_urls(year):
# Get filenames from the year page
# e.g. `>CRND0103-2020-TX_Palestine_6_WNW.txt<`
url = f"{base_url}/{year}/"
r = requests.get(url)
r = requests.get(url, timeout=10)
r.raise_for_status()
fns = re.findall(r">(CRN[a-zA-Z0-9\-_]*\.txt)<", r.text)
if not fns: # pragma: no cover
Expand Down

0 comments on commit db37121

Please sign in to comment.