|
| 1 | +import os |
| 2 | +import re |
| 3 | +from collections.abc import Generator |
| 4 | +from concurrent.futures import ThreadPoolExecutor, as_completed |
| 5 | + |
| 6 | +import google.cloud.bigquery |
| 7 | +from google.cloud import bigquery |
| 8 | + |
| 9 | +from v03_pipeline.lib.misc.gcp import get_service_account_credentials |
| 10 | +from v03_pipeline.lib.misc.requests import requests_retry_session |
| 11 | + |
| 12 | +BIGQUERY_METRICS = [ |
| 13 | + 'collaborator_sample_id', |
| 14 | + 'predicted_sex', |
| 15 | +] |
| 16 | +BIGQUERY_RESOURCE = 'bigquery' |
| 17 | +TABLE_NAME_VALIDATION_REGEX = r'datarepo-\w+.datarepo_\w+' |
| 18 | +TDR_ROOT_URL = 'https://data.terra.bio/api/repository/v1/' |
| 19 | + |
| 20 | + |
| 21 | +def _tdr_request(resource: str) -> dict: |
| 22 | + service_account_token = get_service_account_credentials().token |
| 23 | + s = requests_retry_session() |
| 24 | + res = s.get( |
| 25 | + url=os.path.join(TDR_ROOT_URL, resource), |
| 26 | + headers={'Authorization': f'Bearer {service_account_token}'}, |
| 27 | + timeout=10, |
| 28 | + ) |
| 29 | + res.raise_for_status() |
| 30 | + return res.json() |
| 31 | + |
| 32 | + |
| 33 | +def _get_dataset_ids() -> list[str]: |
| 34 | + res_body = _tdr_request('datasets') |
| 35 | + items = res_body['items'] |
| 36 | + for item in items: |
| 37 | + if not any(x['cloudResource'] == BIGQUERY_RESOURCE for x in item['storage']): |
| 38 | + # Hard failure on purpose to prompt manual investigation. |
| 39 | + msg = 'Datasets without bigquery sources are unsupported' |
| 40 | + raise ValueError(msg) |
| 41 | + return [x['id'] for x in items] |
| 42 | + |
| 43 | + |
| 44 | +def gen_bq_table_names() -> Generator[str]: |
| 45 | + with ThreadPoolExecutor(max_workers=5) as executor: |
| 46 | + futures = [ |
| 47 | + executor.submit( |
| 48 | + _tdr_request, |
| 49 | + f'datasets/{dataset_id}?include=ACCESS_INFORMATION', |
| 50 | + ) |
| 51 | + for dataset_id in _get_dataset_ids() |
| 52 | + ] |
| 53 | + for future in as_completed(futures): |
| 54 | + result = future.result() |
| 55 | + yield f"{result['accessInformation']['bigQuery']['projectId']}.{result['accessInformation']['bigQuery']['datasetName']}" |
| 56 | + |
| 57 | + |
| 58 | +def bq_metrics_query(bq_table_name: str) -> google.cloud.bigquery.table.RowIterator: |
| 59 | + if not re.match(TABLE_NAME_VALIDATION_REGEX, bq_table_name): |
| 60 | + msg = f'{bq_table_name} does not match expected pattern' |
| 61 | + raise ValueError(msg) |
| 62 | + client = bigquery.Client() |
| 63 | + return client.query_and_wait( |
| 64 | + f""" |
| 65 | + SELECT {','.join(BIGQUERY_METRICS)} |
| 66 | + FROM `{bq_table_name}.sample` |
| 67 | + """, # noqa: S608 |
| 68 | + ) |
0 commit comments