-
Notifications
You must be signed in to change notification settings - Fork 22
Add helper functions for querying Terra Data Repository
#998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66ac408
6ad41aa
e304369
e929280
27174aa
4978d74
e7d96bd
f992a51
a767805
1a272d6
5c6edcb
9e25973
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,3 +4,4 @@ gnomad==0.6.4 | |
| aiofiles==24.1.0 | ||
| pydantic==2.8.2 | ||
| google-cloud-dataproc==5.14.0 | ||
| google-cloud-bigquery==3.27.0 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| import requests | ||
| from requests.adapters import HTTPAdapter, Retry | ||
|
|
||
|
|
||
| def requests_retry_session(): | ||
| s = requests.Session() | ||
| retries = Retry( | ||
| total=5, | ||
| backoff_factor=1, | ||
| status_forcelist=[500, 502, 503, 504], | ||
| ) | ||
| s.mount('http://', HTTPAdapter(max_retries=retries)) | ||
| s.mount('https://', HTTPAdapter(max_retries=retries)) | ||
| return s |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| import os | ||
| import re | ||
| from collections.abc import Generator | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
|
|
||
| import google.cloud.bigquery | ||
| from google.cloud import bigquery | ||
|
|
||
| from v03_pipeline.lib.misc.gcp import get_service_account_credentials | ||
| from v03_pipeline.lib.misc.requests import requests_retry_session | ||
|
|
||
| BIGQUERY_METRICS = [ | ||
| 'collaborator_sample_id', | ||
| 'predicted_sex', | ||
| ] | ||
| BIGQUERY_RESOURCE = 'bigquery' | ||
| TABLE_NAME_VALIDATION_REGEX = r'datarepo-\w+.datarepo_\w+' | ||
| TDR_ROOT_URL = 'https://data.terra.bio/api/repository/v1/' | ||
|
|
||
|
|
||
| def _tdr_request(resource: str) -> dict: | ||
| service_account_token = get_service_account_credentials().token | ||
| s = requests_retry_session() | ||
| res = s.get( | ||
| url=os.path.join(TDR_ROOT_URL, resource), | ||
| headers={'Authorization': f'Bearer {service_account_token}'}, | ||
| timeout=10, | ||
| ) | ||
| res.raise_for_status() | ||
| return res.json() | ||
|
|
||
|
|
||
| def _get_dataset_ids() -> list[str]: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we calling this on every pipeline run? It fetches all of the datasets in the TDR, is there a way to filter it first by dataset name perhaps?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! This should be called every pipeline run, but it's only a single API request. The plan was to use the result of this request + a persisted list of dataset ids that we've seen before to create a "new dataset ids" list that would be passed into |
||
| res_body = _tdr_request('datasets') | ||
| items = res_body['items'] | ||
| for item in items: | ||
| if not any(x['cloudResource'] == BIGQUERY_RESOURCE for x in item['storage']): | ||
| # Hard failure on purpose to prompt manual investigation. | ||
| msg = 'Datasets without bigquery sources are unsupported' | ||
| raise ValueError(msg) | ||
| return [x['id'] for x in items] | ||
|
|
||
|
|
||
| def gen_bq_table_names() -> Generator[str]: | ||
| with ThreadPoolExecutor(max_workers=5) as executor: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How did you chose 5
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe ai was responsible for this choice 🍭.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll make it a constant on the downstream pr. |
||
| futures = [ | ||
| executor.submit( | ||
| _tdr_request, | ||
| f'datasets/{dataset_id}?include=ACCESS_INFORMATION', | ||
| ) | ||
| for dataset_id in _get_dataset_ids() | ||
| ] | ||
| for future in as_completed(futures): | ||
| result = future.result() | ||
| yield f"{result['accessInformation']['bigQuery']['projectId']}.{result['accessInformation']['bigQuery']['datasetName']}" | ||
|
|
||
|
|
||
| def bq_metrics_query(bq_table_name: str) -> google.cloud.bigquery.table.RowIterator: | ||
| if not re.match(TABLE_NAME_VALIDATION_REGEX, bq_table_name): | ||
| msg = f'{bq_table_name} does not match expected pattern' | ||
| raise ValueError(msg) | ||
| client = bigquery.Client() | ||
| return client.query_and_wait( | ||
| f""" | ||
| SELECT {','.join(BIGQUERY_METRICS)} | ||
| FROM `{bq_table_name}.sample` | ||
| """, # noqa: S608 | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍