Skip to content

Commit

Permalink
feat!: use wags-tails for data management (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsstevenson authored Nov 27, 2023
1 parent 019230e commit 98e3f01
Show file tree
Hide file tree
Showing 26 changed files with 227 additions and 628 deletions.
4 changes: 1 addition & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ disease-normalizer = "~=0.4.0.dev0"
owlready2 = "*"
rdflib = "*"
wikibaseintegrator = ">=0.12.0"
chembl-downloader = "*"
bioversions = ">=0.4.3"
wags-tails = "*"
ipykernel = "*"
pytest = "*"
pytest-cov = "*"
Expand All @@ -28,7 +27,6 @@ ipython = ">=8.10.0"
jupyterlab = "*"
civicpy = "*"
mypy = "*"
types-requests = "*"
types-pyyaml = "*"
lxml = "*"
xmlformatter = "*"
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,20 @@ To change the port, simply add `-port value`.

### Setting Environment Variables
RxNorm requires a UMLS license, which you can register for one [here](https://www.nlm.nih.gov/research/umls/index.html).
You must set the `RxNORM_API_KEY` environment variable to your API key. This can be found in the [UTS 'My Profile' area](https://uts.nlm.nih.gov/uts/profile) after singing in.
You must set the `UMLS_API_KEY` environment variable to your API key. This can be found in the [UTS 'My Profile' area](https://uts.nlm.nih.gov/uts/profile) after singing in.

```shell script
export RXNORM_API_KEY={rxnorm_api_key}
export UMLS_API_KEY=12345-6789-abcdefg-hijklmnop # make sure to replace with your key!
```

HemOnc.org data requires a Harvard Dataverse API key. After creating a user account on the Harvard Dataverse website, you can follow [these instructions](https://guides.dataverse.org/en/latest/user/account.html) to generate a key. Once you have a key, set the following environment variable:

```shell script
export DATAVERSE_API_KEY={your api key}
export DATAVERSE_API_KEY=12345-6789-abcdefgh-hijklmnop # make sure to replace with your key!
```

#### Update source(s)

The Therapy Normalizer currently aggregates therapy data from:
* [ChEMBL](https://www.ebi.ac.uk/chembl/)
* [ChemIDPlus](https://chem.nlm.nih.gov/chemidplus/)
Expand All @@ -108,10 +110,10 @@ You can update all sources at once with the `--update_all` flag:
python3 -m therapy.cli --update_all
```

The `data/` subdirectory within the package source should house all desired input data. Files for all sources should follow the naming convention demonstrated below (with version numbers/dates changed where applicable).
Thera-Py can retrieve all required data itself, using the [wags-tails](https://github.com/GenomicMedLab/wags-tails) library. By default, data will be housed under `~/.local/share/wags_tails/` in a format like the following:

```
therapy/data
~/.local/share/wags_tails
├── chembl
│ └── chembl_27.db
├── chemidplus
Expand All @@ -128,7 +130,7 @@ therapy/data
├── ncit
│ └── ncit_20.09d.owl
├── rxnorm
│ ├── drug_forms.yaml
│ ├── rxnorm_drug_forms_20210104.yaml
│ └── rxnorm_20210104.RRF
└── wikidata
└── wikidata_20210425.json
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ dependencies = [
dynamic = ["version"]

[project.optional-dependencies]
etl = ["disease-normalizer~=0.4.0.dev0", "owlready2", "rdflib", "wikibaseintegrator>=0.12.0", "chembl-downloader", "bioversions>=0.4.3"]
etl = ["disease-normalizer~=0.4.0.dev0", "owlready2", "rdflib", "wikibaseintegrator>=0.12.0", "wags-tails"]
test = ["pytest", "pytest-cov"]
dev = ["pre-commit", "ruff>=0.1.2", "lxml", "xmlformatter", "mypy", "types-requests", "types-pyyaml"]
dev = ["pre-commit", "ruff>=0.1.2", "lxml", "xmlformatter", "mypy", "types-pyyaml"]

[project.urls]
Homepage = "https://github.com/cancervariants/therapy-normalization"
Expand Down
2 changes: 1 addition & 1 deletion src/therapy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _update_normalizers(
logger.info(msg)

start_load = timer()
source = sources_class_map[n](database=db)
source = sources_class_map[n](database=db, silent=False)
try:
processed_ids += source.perform_etl(use_existing)
except FileNotFoundError as e:
Expand Down
246 changes: 76 additions & 170 deletions src/therapy/etl/base.py
Original file line number Diff line number Diff line change
@@ -1,216 +1,119 @@
"""A base class for extraction, transformation, and loading of data."""
import ftplib
import json
import logging
import os
import re
import tempfile
import zipfile
from abc import ABC, abstractmethod
from functools import lru_cache
from pathlib import Path
from typing import Callable, Dict, List, Optional
from typing import Dict, List, Optional, Union

import bioversions
import requests
import click
from disease.database.dynamodb import DynamoDbDatabase
from disease.query import QueryHandler as DiseaseNormalizer
from pydantic import ValidationError

from therapy import APP_ROOT, ITEM_TYPES, DownloadException
from wags_tails import (
ChemblData,
ChemIDplusData,
CustomData,
DataSource,
DrugBankData,
DrugsAtFdaData,
GToPLigandData,
HemOncData,
NcitData,
RxNormData,
)

from therapy import ITEM_TYPES
from therapy.database import Database
from therapy.etl.rules import Rules
from therapy.schemas import Drug, SourceName

logger = logging.getLogger("therapy")
logger.setLevel(logging.DEBUG)

DEFAULT_DATA_PATH: Path = APP_ROOT / "data"

DATA_DISPATCH = {
SourceName.CHEMBL: ChemblData,
SourceName.CHEMIDPLUS: ChemIDplusData,
SourceName.DRUGBANK: DrugBankData,
SourceName.DRUGSATFDA: DrugsAtFdaData,
SourceName.GUIDETOPHARMACOLOGY: GToPLigandData,
SourceName.HEMONC: HemOncData,
SourceName.NCIT: NcitData,
SourceName.RXNORM: RxNormData,
}


class Base(ABC):
"""The ETL base class.
Default methods are declared to provide basic functions for core source
data-gathering phases (extraction, transformation, loading), as well
as some common subtasks (getting most recent version, downloading data
from an FTP server). Classes should expand or reimplement these methods as
needed.
data-gathering phases (extraction, transformation, loading).
Classes should expand or reimplement these methods as needed.
"""

def __init__(self, database: Database, data_path: Path = DEFAULT_DATA_PATH) -> None:
def __init__(
self, database: Database, data_path: Optional[Path] = None, silent: bool = True
) -> None:
"""Extract from sources.
:param Database database: application database object
:param Path data_path: path to app data directory
:param database: application database object
:param data_path: path to app data directory
:param silent: if True, don't print ETL results to console
"""
self._name = self.__class__.__name__
self._silent = silent
self._src_name = SourceName(self.__class__.__name__)
self._data_source: Union[
ChemblData,
ChemIDplusData,
DrugBankData,
DrugsAtFdaData,
GToPLigandData,
HemOncData,
NcitData,
RxNormData,
CustomData,
] = self._get_data_handler(data_path) # type: ignore
self.database = database
self._src_dir: Path = Path(data_path / self._name.lower())
self._added_ids: List[str] = []
self._rules = Rules(SourceName(self._name))
self._rules = Rules(self._src_name)

def _get_data_handler(self, data_path: Optional[Path] = None) -> DataSource:
"""Construct data handler instance for source. Overwrite for edge-case sources.
:param data_path: location of data storage
:return: instance of wags_tails.DataSource to manage source file(s)
"""
return DATA_DISPATCH[self._src_name](data_dir=data_path, silent=self._silent)

def perform_etl(self, use_existing: bool = False) -> List[str]:
"""Public-facing method to begin ETL procedures on given data.
Returned concept IDs can be passed to Merge method for computing
merged concepts.
:param bool use_existing: if True, don't try to retrieve latest source data
:param use_existing: if True, don't try to retrieve latest source data
:return: list of concept IDs which were successfully processed and
uploaded.
"""
self._extract_data(use_existing)
if not self._silent:
click.echo("Transforming and loading data to DB...")
self._load_meta()
self._transform_data()
return self._added_ids

def get_latest_version(self) -> str:
"""Get most recent version of source data. Should be overriden by
sources not added to Bioversions yet, or other special-case sources.
:return: most recent version, as a str
"""
return bioversions.get_version(self.__class__.__name__)
def _extract_data(self, use_existing: bool) -> None:
"""Acquire source data.
@abstractmethod
def _download_data(self) -> None:
"""Acquire source data and deposit in a usable form with correct file
naming conventions (generally, `<source>_<version>.<filetype>`, or
`<source>_<subset>_<version>.<filetype>` if sources require multiple
files). Shouldn't set any instance attributes.
"""
raise NotImplementedError

def _zip_handler(self, dl_path: Path, outfile_path: Path) -> None:
"""Provide simple callback function to extract the largest file within a given
zipfile and save it within the appropriate data directory.
:param Path dl_path: path to temp data file
:param Path outfile_path: path to save file within
"""
with zipfile.ZipFile(dl_path, "r") as zip_ref:
if len(zip_ref.filelist) > 1:
files = sorted(
zip_ref.filelist, key=lambda z: z.file_size, reverse=True
)
target = files[0]
else:
target = zip_ref.filelist[0]
target.filename = outfile_path.name
zip_ref.extract(target, path=outfile_path.parent)
os.remove(dl_path)

@staticmethod
def _http_download(
url: str,
outfile_path: Path,
headers: Optional[Dict] = None,
handler: Optional[Callable[[Path, Path], None]] = None,
) -> None:
"""Perform HTTP download of remote data file.
:param str url: URL to retrieve file from
:param Path outfile_path: path to where file should be saved. Must be an actual
Path instance rather than merely a pathlike string.
:param Optional[Dict] headers: Any needed HTTP headers to include in request
:param Optional[Callable[[Path, Path], None]] handler: provide if downloaded
file requires additional action, e.g. it's a zip file.
"""
if handler:
dl_path = Path(tempfile.gettempdir()) / "therapy_dl_tmp"
else:
dl_path = outfile_path
# use stream to avoid saving download completely to memory
with requests.get(url, stream=True, headers=headers) as r:
try:
r.raise_for_status()
except requests.HTTPError:
raise DownloadException(
f"Failed to download {outfile_path.name} from {url}."
)
with open(dl_path, "wb") as h:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
h.write(chunk)
if handler:
handler(dl_path, outfile_path)

def _ftp_download(self, host: str, host_dir: str, host_fn: str) -> None:
"""Download data file from FTP site.
:param str host: Source's FTP host name
:param str host_dir: Data directory located on FTP site
:param str host_fn: Filename on FTP site to be downloaded
"""
try:
with ftplib.FTP(host) as ftp:
ftp.login()
logger.debug(f"FTP login to {host} was successful")
ftp.cwd(host_dir)
with open(self._src_dir / host_fn, "wb") as fp:
ftp.retrbinary(f"RETR {host_fn}", fp.write)
except ftplib.all_errors as e:
logger.error(f"FTP download failed: {e}")
raise Exception(e)

def _parse_version(
self, file_path: Path, pattern: Optional[re.Pattern] = None
) -> str:
"""Get version number from provided file path.
:param Path file_path: path to located source data file
:param Optional[re.Pattern] pattern: regex pattern to use
:return: source data version
:raises: FileNotFoundError if version parsing fails
"""
if pattern is None:
pattern = re.compile(type(self).__name__.lower() + r"_(.+)\..+")
matches = re.match(pattern, file_path.name)
if matches is None:
raise FileNotFoundError
else:
return matches.groups()[0]

def _get_existing_files(self) -> List[Path]:
"""Get existing source files from data directory.
:return: sorted list of file objects
"""
return list(sorted(self._src_dir.glob(f"{self._name.lower()}_*.*")))

def _extract_data(self, use_existing: bool = False) -> None:
"""Get source file from data directory.
This method should ensure the source data directory exists, acquire source data,
set the source version value, and assign the source file location to
`self._src_file`. Child classes needing additional functionality (like setting
up a DB cursor, or managing multiple source files) will need to reimplement
this method. If `use_existing` is True, the version number will be parsed from
the existing filename; otherwise, it will be retrieved from the data source,
and if the local file is out-of-date, the newest version will be acquired.
This method is responsible for initializing an instance of a data handler and,
in most cases, setting ``self._data_file`` and ``self._version``.
:param bool use_existing: if True, don't try to fetch latest source data
"""
self._src_dir.mkdir(exist_ok=True, parents=True)
src_name = type(self).__name__.lower()
if use_existing:
files = self._get_existing_files()
if len(files) < 1:
raise FileNotFoundError(f"No source data found for {src_name}")
self._src_file: Path = files[-1]
try:
self._version = self._parse_version(self._src_file)
except FileNotFoundError:
raise FileNotFoundError(
f"Unable to parse version value from {src_name} source data file "
f"located at {self._src_file.absolute().as_uri()} -- "
"check filename against schema defined in README: "
"https://github.com/cancervariants/therapy-normalization#update-sources"
)
else:
self._version = self.get_latest_version()
fglob = f"{src_name}_{self._version}.*"
latest = list(self._src_dir.glob(fglob))
if not latest:
self._download_data()
latest = list(self._src_dir.glob(fglob))
assert len(latest) != 0 # probably unnecessary, but just to be safe
self._src_file = latest[0]
self._data_file, self._version = self._data_source.get_latest(
from_local=use_existing
)

@abstractmethod
def _load_meta(self) -> None:
Expand Down Expand Up @@ -311,13 +214,16 @@ def _load_therapy(self, therapy: Dict) -> None:
class DiseaseIndicationBase(Base):
"""Base class for sources that require disease normalization capabilities."""

def __init__(self, database: Database, data_path: Path = DEFAULT_DATA_PATH) -> None:
"""Initialize source ETL instance.
def __init__(
self, database: Database, data_path: Optional[Path] = None, silent: bool = True
) -> None:
"""Extract from sources.
:param therapy.database.Database database: application database
:param Path data_path: path to normalizer data directory
:param database: application database object
:param data_path: path to app data directory
:param silent: if True, don't print ETL results to console
"""
super().__init__(database, data_path)
super().__init__(database, data_path, silent)
db = DynamoDbDatabase(self.database.endpoint_url)
self.disease_normalizer = DiseaseNormalizer(db)

Expand Down
Loading

0 comments on commit 98e3f01

Please sign in to comment.