Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ gnomad==0.6.4
aiofiles==24.1.0
pydantic==2.8.2
google-cloud-dataproc==5.14.0
google-cloud-bigquery==3.27.0
22 changes: 18 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,30 @@ frozenlist==1.5.0
gnomad==0.6.4
# via -r requirements.in
google-api-core[grpc]==2.22.0
# via google-cloud-dataproc
# via
# google-cloud-bigquery
# google-cloud-core
# google-cloud-dataproc
google-auth==2.35.0
# via
# google-api-core
# google-auth-oauthlib
# google-cloud-bigquery
# google-cloud-core
# google-cloud-dataproc
# hail
google-auth-oauthlib==0.8.0
# via hail
google-cloud-bigquery==3.27.0
# via -r requirements.in
google-cloud-core==2.4.1
# via google-cloud-bigquery
google-cloud-dataproc==5.14.0
# via -r requirements.in
google-crc32c==1.6.0
# via google-resumable-media
google-resumable-media==2.7.2
# via google-cloud-bigquery
googleapis-common-protos[grpc]==1.65.0
# via
# google-api-core
Expand Down Expand Up @@ -197,6 +210,7 @@ orjson==3.10.10
packaging==24.1
# via
# bokeh
# google-cloud-bigquery
# plotly
pandas==2.2.3
# via
Expand Down Expand Up @@ -256,16 +270,15 @@ pygments==2.18.0
# ipython
# rich
pyjwt[crypto]==2.9.0
# via
# msal
# pyjwt
# via msal
pyspark==3.5.3
# via hail
python-daemon==3.1.0
# via luigi
python-dateutil==2.9.0.post0
# via
# botocore
# google-cloud-bigquery
# luigi
# pandas
python-json-logger==2.0.7
Expand All @@ -282,6 +295,7 @@ requests==2.32.3
# via
# azure-core
# google-api-core
# google-cloud-bigquery
# hail
# msal
# msrest
Expand Down
10 changes: 2 additions & 8 deletions v03_pipeline/lib/misc/allele_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import hailtop.fs as hfs
import requests
from requests import HTTPError
from requests.adapters import HTTPAdapter, Retry

from v03_pipeline.lib.logger import get_logger
from v03_pipeline.lib.misc.requests import requests_retry_session
from v03_pipeline.lib.model import Env, ReferenceGenome

MAX_VARIANTS_PER_REQUEST = 1000000
Expand Down Expand Up @@ -96,13 +96,7 @@ def register_alleles(
logger.info('Calling the ClinGen Allele Registry')
with hfs.open(formatted_vcf_file_name, 'r') as vcf_in:
data = vcf_in.read()
s = requests.Session()
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
)
s.mount('https://', HTTPAdapter(max_retries=retries))
s = requests_retry_session()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

res = s.put(
url=build_url(base_url, reference_genome),
data=data,
Expand Down
14 changes: 14 additions & 0 deletions v03_pipeline/lib/misc/requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import requests
from requests.adapters import HTTPAdapter, Retry


def requests_retry_session():
s = requests.Session()
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
)
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))
return s
68 changes: 68 additions & 0 deletions v03_pipeline/lib/misc/terra_data_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
import re
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor, as_completed

import google.cloud.bigquery
from google.cloud import bigquery

from v03_pipeline.lib.misc.gcp import get_service_account_credentials
from v03_pipeline.lib.misc.requests import requests_retry_session

BIGQUERY_METRICS = [
'collaborator_sample_id',
'predicted_sex',
]
BIGQUERY_RESOURCE = 'bigquery'
TABLE_NAME_VALIDATION_REGEX = r'datarepo-\w+.datarepo_\w+'
TDR_ROOT_URL = 'https://data.terra.bio/api/repository/v1/'


def _tdr_request(resource: str) -> dict:
service_account_token = get_service_account_credentials().token
s = requests_retry_session()
res = s.get(
url=os.path.join(TDR_ROOT_URL, resource),
headers={'Authorization': f'Bearer {service_account_token}'},
timeout=10,
)
res.raise_for_status()
return res.json()


def _get_dataset_ids() -> list[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we calling this on every pipeline run? It fetches all of the datasets in the TDR, is there a way to filter it first by dataset name perhaps?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! This should be called every pipeline run, but it's only a single API request. The plan was to use the result of this request + a persisted list of dataset ids that we've seen before to create a "new dataset ids" list that would be passed into gen_bq_sample_metrics.

res_body = _tdr_request('datasets')
items = res_body['items']
for item in items:
if not any(x['cloudResource'] == BIGQUERY_RESOURCE for x in item['storage']):
# Hard failure on purpose to prompt manual investigation.
msg = 'Datasets without bigquery sources are unsupported'
raise ValueError(msg)
return [x['id'] for x in items]


def gen_bq_table_names() -> Generator[str]:
with ThreadPoolExecutor(max_workers=5) as executor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you chose 5 max_workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe ai was responsible for this choice 🍭.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make it a constant on the downstream pr.

futures = [
executor.submit(
_tdr_request,
f'datasets/{dataset_id}?include=ACCESS_INFORMATION',
)
for dataset_id in _get_dataset_ids()
]
for future in as_completed(futures):
result = future.result()
yield f"{result['accessInformation']['bigQuery']['projectId']}.{result['accessInformation']['bigQuery']['datasetName']}"


def bq_metrics_query(bq_table_name: str) -> google.cloud.bigquery.table.RowIterator:
if not re.match(TABLE_NAME_VALIDATION_REGEX, bq_table_name):
msg = f'{bq_table_name} does not match expected pattern'
raise ValueError(msg)
client = bigquery.Client()
return client.query_and_wait(
f"""
SELECT {','.join(BIGQUERY_METRICS)}
FROM `{bq_table_name}.sample`
""", # noqa: S608
)
Loading
Loading