Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Precommit mypy increment #729

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ To run the Seqr Loader workflow on the dataset called `validation`, create a con
[workflow]
input_datasets = ['validation']
sequencing_type = 'genome'
create_es_index_for_datasets = ['validation']
skip_sgs = [
'CPGXXXX', # eg: low coverage (8x)
]
Expand Down Expand Up @@ -131,12 +130,11 @@ The genome GVCF QC report will be exposed as https://main-web.populationgenomics

### Stage selection example: upload Elasticsearch indices

If you want the workflow to create Elasticsearch indices in the end, make sure you include the stage `MtToEs` into the workflow (if every prerequisite stage is already finished, you can specify it under `first_stages`), and set the `workflow/create_es_index_for_datasets` section with the list of datasets for which you want the indices to be created:
If you want the workflow to create Elasticsearch indices in the end, make sure you include the stage `MtToEs` into the workflow (if every prerequisite stage is already finished, you can specify it under `first_stages`):

```toml
[workflow]
first_stages = ['MtToEs']
create_es_index_for_datasets = ['validation']
```

The resulting index will be named using the current datestamp, or using `worfklow/output_version` option if it's specified. The Elasticsearch server is configured using the `elasticsearch` section in `configs/defailts/seqr_loader.toml`. The reason for not automatically creating indices for every project is that the Elasticsearch instance can easily run out of disk space, so additional safeguard is handy.
Expand Down
4 changes: 0 additions & 4 deletions configs/defaults/seqr_loader.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ vep_version = '110'
# Calling intervals (defauls to whole genome intervals)
#intervals_path =

# Create Seqr ElasticSearch indices for these datasets. If not specified, will
# create indices for all input datasets.
#create_es_index_for_datasets = []

write_vcf = ["udn-aus"]

[resource_overrides]
Expand Down
14 changes: 4 additions & 10 deletions cpg_workflows/jobs/count.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,14 @@

from cpg_utils import Path, to_path
from cpg_utils.config import get_config, image_path
from cpg_utils.hail_batch import command
from cpg_workflows.filetypes import (
BamPath,
CramPath,
)
from cpg_utils.hail_batch import Batch, command
from cpg_workflows.filetypes import BamPath, CramPath
from cpg_workflows.jobs.bam_to_cram import cram_to_bam
from cpg_workflows.resources import STANDARD
from cpg_workflows.utils import can_reuse
from cpg_workflows.workflow import (
SequencingGroup,
)


def count_res_group(b: hb.Batch) -> hb.ResourceGroup:
def count_res_group(b: Batch) -> hb.ResourceGroup:
"""
Define resource group for counting.
"""
Expand Down Expand Up @@ -114,7 +108,7 @@ def __repr__(self):


def count(
b: hb.Batch,
b: Batch,
input_cram_or_bam: BamPath | CramPath,
output_path: str | Path,
summary_path: str | Path,
Expand Down
4 changes: 2 additions & 2 deletions cpg_workflows/jobs/gcnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,8 @@ def merge_calls(sg_vcfs: list[str], docker_image: str, job_attrs: dict[str, str]
third_job = get_batch().new_job('bgzip and tabix')
third_job.image(docker_image)
third_job.declare_resource_group(output={'vcf.bgz': '{root}.vcf.bgz', 'vcf.bgz.tbi': '{root}.vcf.bgz.tbi'})
third_job.command(f'bcftools view {pyjob.output} | bgzip -c > {third_job.output["vcf.bgz"]}')
third_job.command(f'tabix {third_job.output["vcf.bgz"]}')
third_job.command(f'bcftools view {pyjob.output} | bgzip -c > {third_job.output["vcf.bgz"]}') # type: ignore
third_job.command(f'tabix {third_job.output["vcf.bgz"]}') # type: ignore

# dependency setting between jobs should be implicit due to temp file passing

Expand Down
1 change: 1 addition & 0 deletions cpg_workflows/large_cohort/dataproc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def dataproc_job(
pyfiles=pyfiles,
job_name=job_name,
region='australia-southeast1',
hail_version=dataproc.DEFAULT_HAIL_VERSION,
)

if num_workers is None:
Expand Down
7 changes: 5 additions & 2 deletions cpg_workflows/stages/gatk_sv/gatk_sv_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
Common methods for all GATK-SV workflows
"""

import pathlib
import re
from enum import Enum
from functools import lru_cache
from os.path import join
from random import randint
from typing import Any

from cloudpathlib import CloudPath

from hailtop.batch.job import Job

from cpg_utils import Path, to_path
Expand Down Expand Up @@ -179,7 +182,7 @@ def add_gatk_sv_jobs(
# pre-process input_dict
paths_as_strings: dict = {}
for key, value in input_dict.items():
if isinstance(value, Path):
if isinstance(value, (pathlib.Path, CloudPath)):
paths_as_strings[f'{wfl_name}.{key}'] = str(value)
elif isinstance(value, (list, set)):
paths_as_strings[f'{wfl_name}.{key}'] = [str(v) for v in value]
Expand Down Expand Up @@ -213,7 +216,7 @@ def add_gatk_sv_jobs(
for key, resource in output_dict.items():
out_path = expected_out_dict[key]
if isinstance(resource, list):
for source, dest in zip(resource, out_path):
for source, dest in zip(resource, out_path): # type: ignore
cmds.append(f'gsutil cp "$(cat {source})" "{dest}"')
else:
cmds.append(f'gsutil cp "$(cat {resource})" "{out_path}"')
Expand Down
4 changes: 2 additions & 2 deletions cpg_workflows/stages/gatk_sv/gatk_sv_single_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> S
dataset=sequencing_group.dataset,
wfl_name=self.name,
input_dict=input_dict,
expected_out_dict=expected_d,
expected_out_dict=expected_d, # type: ignore
sequencing_group_id=sequencing_group.id,
labels=billing_labels,
job_size=CromwellJobSizes.LARGE,
Expand Down Expand Up @@ -201,7 +201,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput:
dataset=cohort.analysis_dataset,
wfl_name=self.name,
input_dict=input_dict,
expected_out_dict=expected_d,
expected_out_dict=expected_d, # type: ignore
labels=billing_labels,
job_size=CromwellJobSizes.MEDIUM,
)
Expand Down
37 changes: 19 additions & 18 deletions cpg_workflows/stages/gcnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import json

from google.api_core.exceptions import PermissionDenied

from cpg_utils import Path, dataproc, to_path
from cpg_utils.config import AR_GUID_NAME, get_config, image_path, reference_path, try_get_ar_guid
from cpg_utils.hail_batch import get_batch, query_command
Expand All @@ -15,7 +17,9 @@
get_references,
queue_annotate_sv_jobs,
)
from cpg_workflows.stages.seqr_loader import es_password
from cpg_workflows.targets import Cohort, SequencingGroup
from cpg_workflows.utils import get_logger
from cpg_workflows.workflow import (
CohortStage,
Dataset,
Expand Down Expand Up @@ -241,14 +245,14 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:

expected_out = self.expected_outputs(cohort)

ped_path = cohort.write_ped_file(expected_out['pedigree'])
ped_path = cohort.write_ped_file(expected_out['pedigree']) # type: ignore

jobs = gcnv.run_joint_segmentation(
segment_vcfs=all_vcfs,
pedigree=str(ped_path),
intervals=str(intervals),
tmp_prefix=expected_out['tmp_prefix'],
output_path=expected_out['clustered_vcf'],
tmp_prefix=expected_out['tmp_prefix'], # type: ignore
output_path=expected_out['clustered_vcf'], # type: ignore
job_attrs=self.get_job_attrs(cohort),
)
return self.make_outputs(cohort, data=expected_out, jobs=jobs)
Expand Down Expand Up @@ -412,6 +416,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
strv_job.memory('8Gi')

strvctvre_phylop = get_references(['strvctvre_phylop'])['strvctvre_phylop']
assert isinstance(strvctvre_phylop, str)
phylop_in_batch = get_batch().read_input(strvctvre_phylop)

input_dict = inputs.as_dict(cohort, AnnotateCNV)
Expand All @@ -432,8 +437,8 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:

# run strvctvre
strv_job.command(f'python StrVCTVRE.py -i {input_vcf} -o temp.vcf -f vcf -p {phylop_in_batch}')
strv_job.command(f'bgzip temp.vcf -c > {strv_job.output_vcf["vcf.bgz"]}')
strv_job.command(f'tabix {strv_job.output_vcf["vcf.bgz"]}')
strv_job.command(f'bgzip temp.vcf -c > {strv_job.output_vcf["vcf.bgz"]}') # type: ignore
strv_job.command(f'tabix {strv_job.output_vcf["vcf.bgz"]}') # type: ignore

get_batch().write_output(
strv_job.output_vcf,
Expand Down Expand Up @@ -570,31 +575,26 @@ def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None
"""
Uses analysis-runner's dataproc helper to run a hail query script
"""
if (
es_datasets := get_config()['workflow'].get('create_es_index_for_datasets')
) and dataset.name not in es_datasets:
# Skipping dataset that wasn't explicitly requested to upload to ES
try:
es_password_string = es_password()
except PermissionDenied:
get_logger().warning(f'No permission to access ES password, skipping for {dataset}')
return self.make_outputs(dataset)
except KeyError:
get_logger().warning(f'ES section not in config, skipping for {dataset}')
return self.make_outputs(dataset)

dataset_mt_path = inputs.as_path(target=dataset, stage=AnnotateDatasetCNV, key='mt')
index_name = self.expected_outputs(dataset)['index_name']
done_flag_path = self.expected_outputs(dataset)['done_flag']

if 'elasticsearch' not in get_config():
raise ValueError(
f'"elasticsearch" section is not defined in config, cannot create '
f'Elasticsearch index for dataset {dataset}',
)

from cpg_workflows.stages.seqr_loader import es_password

# transformation is the same, just use the same methods file?
script = (
f'cpg_workflows/dataproc_scripts/mt_to_es.py '
f'--mt-path {dataset_mt_path} '
f'--es-index {index_name} '
f'--done-flag-path {done_flag_path} '
f'--es-password {es_password()}'
f'--es-password {es_password_string}'
)
pyfiles = ['seqr-loading-pipelines/hail_scripts']
job_name = f'{dataset.name}: create ES index'
Expand All @@ -608,6 +608,7 @@ def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None
pyfiles=pyfiles,
job_name=job_name,
region='australia-southeast1',
hail_version=dataproc.DEFAULT_HAIL_VERSION,
)
else:
j = dataproc.hail_dataproc_job(
Expand Down
14 changes: 5 additions & 9 deletions cpg_workflows/stages/joint_genotyping_qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
from cpg_utils import Path, to_path
from cpg_utils.config import get_config
from cpg_utils.hail_batch import get_batch
from cpg_workflows import get_cohort
from cpg_workflows.jobs.happy import happy
from cpg_workflows.jobs.multiqc import multiqc
from cpg_workflows.jobs.picard import vcf_qc
from cpg_workflows.stages.joint_genotyping import JointGenotyping
from cpg_workflows.targets import SequencingGroup
from cpg_workflows.workflow import (
Cohort,
CohortStage,
Expand All @@ -20,11 +24,6 @@
stage,
)

from .. import get_cohort
from ..jobs.happy import happy
from ..targets import SequencingGroup
from .joint_genotyping import JointGenotyping


@stage(required_stages=JointGenotyping)
class JointVcfQC(CohortStage):
Expand Down Expand Up @@ -121,10 +120,7 @@ def _update_meta(output_path: str) -> dict[str, Any]:


@stage(
required_stages=[
JointVcfQC,
JointVcfHappy,
],
required_stages=[JointVcfQC, JointVcfHappy],
analysis_type='qc',
analysis_keys=['json'],
update_analysis_meta=_update_meta,
Expand Down
8 changes: 5 additions & 3 deletions cpg_workflows/stages/large_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:

j = get_batch().new_job('Combiner', (self.get_job_attrs() or {}) | {'tool': 'hail query'})

init_batch_args = {'worker_memory': 'highmem'} if get_config()['workflow'].get('highmem_workers') else None
init_batch_args: dict[str, str | int] = (
{'worker_memory': 'highmem'} if get_config()['workflow'].get('highmem_workers') else {}
)

j.image(image_path('cpg_workflows'))
j.command(
Expand Down Expand Up @@ -296,7 +298,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:

@stage(required_stages=Vqsr)
class LoadVqsr(CohortStage):
def expected_outputs(self, cohort: Cohort) -> dict[str, Path]:
def expected_outputs(self, cohort: Cohort) -> Path:
return get_workflow().prefix / 'vqsr.ht'

def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
Expand All @@ -323,7 +325,7 @@ def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:

@stage(required_stages=[Combiner, SampleQC, Relatedness])
class Frequencies(CohortStage):
def expected_outputs(self, cohort: Cohort) -> dict[str, Path]:
def expected_outputs(self, cohort: Cohort) -> Path:
return get_workflow().prefix / 'frequencies.ht'

def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None:
Expand Down
25 changes: 13 additions & 12 deletions cpg_workflows/stages/seqr_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""
from typing import Any

from google.api_core.exceptions import PermissionDenied

from cpg_utils import Path, dataproc, to_path
from cpg_utils.cloud import read_secret
from cpg_utils.config import get_config, image_path
Expand All @@ -14,6 +16,7 @@
cohort_to_vcf_job,
)
from cpg_workflows.query_modules import seqr_loader
from cpg_workflows.utils import get_logger
from cpg_workflows.workflow import (
Cohort,
CohortStage,
Expand Down Expand Up @@ -211,7 +214,7 @@ def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None
return self.make_outputs(dataset, data=self.expected_outputs(dataset), jobs=job)


def es_password() -> str:
def es_password() -> str | None:
"""
Get Elasticsearch password. Moved into a separate method to simplify
mocking in tests.
Expand Down Expand Up @@ -249,28 +252,25 @@ def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None
"""
Transforms the MT into a Seqr index, requires Dataproc
"""
if (
es_datasets := get_config()['workflow'].get('create_es_index_for_datasets')
) and dataset.name not in es_datasets:
# Skipping dataset that wasn't explicitly requested to upload to ES:
try:
es_password_string = es_password()
except PermissionDenied:
get_logger().warning(f'No permission to access ES password, skipping for {dataset}')
return self.make_outputs(dataset)
except KeyError:
get_logger().warning(f'ES section not in config, skipping for {dataset}')
return self.make_outputs(dataset)

dataset_mt_path = inputs.as_path(target=dataset, stage=AnnotateDataset, key='mt')
index_name = self.expected_outputs(dataset)['index_name']
done_flag_path = self.expected_outputs(dataset)['done_flag']

if 'elasticsearch' not in get_config():
raise ValueError(
f'"elasticsearch" section is not defined in config, cannot create '
f'Elasticsearch index for dataset {dataset}',
)

script = (
f'cpg_workflows/dataproc_scripts/mt_to_es.py '
f'--mt-path {dataset_mt_path} '
f'--es-index {index_name} '
f'--done-flag-path {done_flag_path} '
f'--es-password {es_password()}'
f'--es-password {es_password_string}'
)
pyfiles = ['seqr-loading-pipelines/hail_scripts']
job_name = f'{dataset.name}: create ES index'
Expand All @@ -284,6 +284,7 @@ def queue_jobs(self, dataset: Dataset, inputs: StageInput) -> StageOutput | None
pyfiles=pyfiles,
job_name=job_name,
region='australia-southeast1',
hail_version=dataproc.DEFAULT_HAIL_VERSION,
)
else:
j = dataproc.hail_dataproc_job(
Expand Down
Loading
Loading