Skip to content

Commit

Permalink
Merge pull request #24 from HumanCellAtlas/ds_schema_test
Browse files Browse the repository at this point in the history
Make it possible to handle different metadata versions using the same version of pipeline-tools code. Switch to pytest and add a metadata schema integration test. Add some support for v5 metadata.
  • Loading branch information
David Shiga authored Mar 16, 2018
2 parents f4004dc + 3cfa0fa commit 3cbef2a
Show file tree
Hide file tree
Showing 18 changed files with 788 additions and 55 deletions.
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ python:
- '2.7'
- '3.6'
install: pip install -r requirements.txt -r test-requirements.txt
script: python -m unittest discover -v
env:
- TEST_SUITE=unit
- TEST_SUITE=latest_schema
script: bash test.sh $TEST_SUITE
notifications:
slack:
on_success: change
Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,26 @@
This repo contains Python code and pipelines for interacting with the Human Cell Atlas Data Coordination Platform. They are used by the Secondary Analysis Service.

The pipelines wrap analysis pipelines from the Skylab repo and provide some glue to interface with the DCP. The adapter pipelines take bundle ids as inputs, query the Data Storage Service to find the input files needed by the analysis pipelines, then run the analysis pipelines and submit the results to the Ingest Service. This helps us keep the analysis pipelines themselves free of dependencies on the DCP.

## Run tests

### Create a virtual environment

```
virtualenv pipeline-tools-test-env
source pipeline-tools-test-env/bin/activate
pip install -r test-requirements.txt
```

### Run unit tests

```
bash test.sh
```

### Run schema tests

```
export TEST_SUITE=latest_schema
bash test.sh
```
26 changes: 13 additions & 13 deletions adapter_pipelines/Optimus/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ task GetInputs {
command <<<
python <<CODE
from pipeline_tools import dcp_utils
from pipeline_tools import input_utils
# Get bundle manifest
uuid = '${bundle_uuid}'
Expand All @@ -20,18 +21,17 @@ task GetInputs {
retry_seconds = ${retry_seconds}
timeout_seconds = ${timeout_seconds}
print('Getting bundle manifest for id {0}, version {1}'.format(uuid, version))
manifest_files = dcp_utils.get_manifest_files(uuid, version, dss_url, timeout_seconds, retry_seconds)
manifest = dcp_utils.get_manifest(uuid, version, dss_url, timeout_seconds, retry_seconds)
manifest_files = dcp_utils.get_manifest_file_dicts(manifest)
print('Downloading assay.json')
assay_json_uuid = manifest_files['name_to_meta']['assay.json']['uuid']
assay_json = dcp_utils.get_file_by_uuid(assay_json_uuid, dss_url)
input_metadata_file_uuid = input_utils.get_input_metadata_file_uuid(manifest_files)
input_metadata_json = dcp_utils.get_file_by_uuid(input_metadata_file_uuid, dss_url)
# Parse inputs from assay_json and write to fastq_inputs
# Parse inputs from metadata and write to fastq_inputs
print('Writing fastq inputs to fastq_inputs.tsv')
lanes = assay_json['content']['seq']['lanes']
r1 = [manifest_files['name_to_meta'][lane['r1']]['url'] for lane in lanes]
r2 = [manifest_files['name_to_meta'][lane['r2']]['url'] for lane in lanes]
i1 = [manifest_files['name_to_meta'][lane['i1']]['url'] for lane in lanes]
sample_id = input_utils.get_sample_id(input_metadata_json)
lanes = input_utils.get_optimus_lanes(input_metadata_json)
r1, r2, i1 = input_utils.get_optimus_inputs(lanes, manifest_files)
fastq_inputs = [list(i) for i in zip(r1, r2, i1)]
print(fastq_inputs)
Expand All @@ -40,14 +40,14 @@ task GetInputs {
f.write('\t'.join(line) +'\n')
print('Writing sample ID to inputs.tsv')
sample_id = assay_json['has_input']
sample_id = input_utils.get_sample_id(input_metadata_json)
with open('inputs.tsv', 'w') as f:
f.write('{0}'.format(sample_id))
print('Wrote input map')
CODE
>>>
runtime {
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.9"
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.11"
}
output {
String sample_id = read_string("inputs.tsv")
Expand Down Expand Up @@ -88,7 +88,7 @@ task inputs_for_submit {
>>>
runtime {
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.9"
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.11"
}
output {
Expand Down Expand Up @@ -127,7 +127,7 @@ task outputs_for_submit {
>>>
runtime {
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.9"
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.11"
}
output {
Expand Down
20 changes: 10 additions & 10 deletions adapter_pipelines/ss2_single_sample/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ task GetInputs {
command <<<
python <<CODE
from pipeline_tools import dcp_utils
from pipeline_tools import input_utils
# Get bundle manifest
uuid = "${bundle_uuid}"
Expand All @@ -20,17 +21,16 @@ task GetInputs {
retry_seconds = ${retry_seconds}
timeout_seconds = ${timeout_seconds}
print("Getting bundle manifest for id {0}, version {1}".format(uuid, version))
manifest_files = dcp_utils.get_manifest_files(uuid, version, dss_url, timeout_seconds, retry_seconds)
manifest = dcp_utils.get_manifest(uuid, version, dss_url, timeout_seconds, retry_seconds)
manifest_files = dcp_utils.get_manifest_file_dicts(manifest)
print("Downloading assay.json")
assay_json_uuid = manifest_files["name_to_meta"]["assay.json"]["uuid"]
assay_json = dcp_utils.get_file_by_uuid(assay_json_uuid, dss_url)
inputs_metadata_file_uuid = input_utils.get_input_metadata_file_uuid(manifest_files)
inputs_metadata_json = dcp_utils.get_file_by_uuid(inputs_metadata_file_uuid, dss_url)
sample_id = assay_json["has_input"]
fastq_1_name = assay_json["content"]["seq"]["lanes"][0]["r1"]
fastq_2_name = assay_json["content"]["seq"]["lanes"][0]["r2"]
fastq_1_url = manifest_files["name_to_meta"][fastq_1_name]["url"]
fastq_2_url = manifest_files["name_to_meta"][fastq_2_name]["url"]
sample_id = input_utils.get_sample_id(inputs_metadata_json)
fastq_1_name, fastq_2_name = input_utils.get_smart_seq_2_fastq_names(inputs_metadata_json)
fastq_1_url = dcp_utils.get_file_url(manifest_files, fastq_1_name)
fastq_2_url = dcp_utils.get_file_url(manifest_files, fastq_2_name)
print("Creating input map")
with open("inputs.tsv", "w") as f:
Expand All @@ -40,7 +40,7 @@ task GetInputs {
CODE
>>>
runtime {
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.9"
docker: "quay.io/humancellatlas/secondary-analysis-pipeline-tools:v0.1.11"
}
output {
Object inputs = read_object("inputs.tsv")
Expand Down
6 changes: 5 additions & 1 deletion pipeline_tools/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ To run unit tests, first create a virtual environment with the requirements::

Then, run unit tests from the root of the pipeline-tools repo like this::

python -m unittest discover -v
bash test.sh

To run schema integration tests, do::

export TEST_SUITE="latest_schema"
bash test.sh

create_analysis_json.py
=======================
Expand Down
15 changes: 13 additions & 2 deletions pipeline_tools/dcp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_file_by_uuid(file_id, dss_url):
return response.json()


def get_manifest_files(bundle_uuid, bundle_version, dss_url, timeout_seconds, retry_seconds):
def get_manifest(bundle_uuid, bundle_version, dss_url, timeout_seconds, retry_seconds):
"""
Retrieve manifest.json file for a given bundle uuid and version.
:param str bundle_uuid: Bundle unique id
Expand Down Expand Up @@ -48,19 +48,30 @@ def get_manifest_files(bundle_uuid, bundle_version, dss_url, timeout_seconds, re
current = time.time()
manifest = response.json()

return manifest


def get_manifest_file_dicts(manifest):
bundle = manifest['bundle']
name_to_meta = {}
url_to_name = {}
for f in bundle['files']:
name_to_meta[f['name']] = f
url_to_name[f['url']] = f['name']

return {
'name_to_meta': name_to_meta,
'url_to_name': url_to_name
}


def get_file_uuid(manifest_file_dicts, file_name):
return manifest_file_dicts['name_to_meta'][file_name]['uuid']


def get_file_url(manifest_file_dicts, file_name):
return manifest_file_dicts['name_to_meta'][file_name]['url']


def get_auth_token(url="https://danielvaughan.eu.auth0.com/oauth/token",
client_id="Zdsog4nDAnhQ99yiKwMQWAPc2qUDlR99",
client_secret="t-OAE-GQk_nZZtWn-QQezJxDsLXmU7VSzlAh9cKW5vb87i90qlXGTvVNAjfT9weF",
Expand Down
87 changes: 87 additions & 0 deletions pipeline_tools/input_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from pipeline_tools import dcp_utils


def get_sample_id(metadata, version='4'):
"""Return the sample id from the given metadata"""
if version == '4':
return _get_sample_id_v4(metadata)
else:
raise NotImplementedError('Only implemented for v4 metadata')


def _get_sample_id_v4(assay_json):
"""Return sample id from assay json"""
return assay_json["has_input"]


def get_input_metadata_file_uuid(manifest_files, version='4'):
"""Get the uuid of the file containing metadata about pipeline input files,
e.g. assay.json in v4"""
if version == '5':
return _get_input_metadata_file_uuid_v5(manifest_files)
elif version == '4':
return _get_input_metadata_file_uuid_v4(manifest_files)
else:
raise NotImplementedError('Only implemented for v4 and v5 metadata')


def _get_input_metadata_file_uuid_v5(manifest_files):
"""Get the uuid of the files.json file"""
return dcp_utils.get_file_uuid(manifest_files, 'files.json')


def _get_input_metadata_file_uuid_v4(manifest_files):
"""Get the uuid of the assay.json file"""
return dcp_utils.get_file_uuid(manifest_files, 'assay.json')


def get_smart_seq_2_fastq_names(metadata, version='4'):
"""Get the fastq file names from the given metadata"""
if version == '5':
return _get_smart_seq_2_fastq_names_v5(metadata)
elif version == '4':
return _get_smart_seq_2_fastq_names_v4(metadata)
else:
raise NotImplementedError('Only implemented for v4 and v5 metadata')


def _get_smart_seq_2_fastq_names_v5(files_json):
"""Return fastq file names from files json"""
index_to_name = {}
for f in files_json['files']:
index = f['content']['read_index']
file_name = f['content']['file_core']['file_name']
index_to_name[index] = file_name
return index_to_name['read1'], index_to_name['read2']


def _get_smart_seq_2_fastq_names_v4(assay_json):
"""Return fastq file names from assay json"""
fastq_1_name = assay_json["content"]["seq"]["lanes"][0]["r1"]
fastq_2_name = assay_json["content"]["seq"]["lanes"][0]["r2"]
return fastq_1_name, fastq_2_name


def get_optimus_lanes(metadata_json, version='4'):
"""Get the lane metadata"""
if version == '4':
return _get_optimus_lanes_v4(metadata_json)
else:
raise NotImplementedError('Only implemented for v4 metadata')


def _get_optimus_lanes_v4(assay_json):
"""Return the lane metadata from the assay json"""
lanes = assay_json['content']['seq']['lanes']
return lanes


def get_optimus_inputs(lanes, manifest_files):
"""Return three lists of urls, representing fastqs for r1, r2, and i1, respectively.
In each list, the first item is for the first lane, the second item is for the second lane, etc.
"""
r1 = [manifest_files['name_to_meta'][lane['r1']]['url'] for lane in lanes]
r2 = [manifest_files['name_to_meta'][lane['r2']]['url'] for lane in lanes]
i1 = [manifest_files['name_to_meta'][lane['i1']]['url'] for lane in lanes]

return r1, r2, i1
72 changes: 72 additions & 0 deletions pipeline_tools/tests/data/metadata/v4/optimus_assay.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"content": {
"single_cell": {
"cell_handling": "10x_v2",
"cell_barcode": {
"read": "Read 1",
"size": 16,
"white_list_file": "pbmc8k_S1_L007_R1_001.fastq.gz,pbmc8k_S1_L008_R1_001.fastq.gz",
"offset": 0
}
},
"core": {
"type": "assay",
"schema_url": "https://raw.githubusercontent.com/HumanCellAtlas/metadata-schema/4.6.1/json_schema/assay.json",
"schema_version": "4.6.1"
},
"rna": {
"end_bias": "three_prime_end",
"primer": "poly-dT",
"strand": "both",
"library_construction": "10x_v2"
},
"assay_id": "c349cce6-6d63-4976-832e-3c27ca1399ac",
"seq": {
"paired_ends": true,
"lanes": [
{
"i1": "pbmc8k_S1_L007_I1_001.fastq.gz",
"number": 7,
"r2": "pbmc8k_S1_L007_R2_001.fastq.gz",
"r1": "pbmc8k_S1_L007_R1_001.fastq.gz"
},
{
"i1": "pbmc8k_S1_L008_I1_001.fastq.gz",
"number": 8,
"r2": "pbmc8k_S1_L008_R2_001.fastq.gz",
"r1": "pbmc8k_S1_L008_R1_001.fastq.gz"
}
],
"instrument_platform": "Illumina",
"molecule": "polyA RNA",
"instrument_model": "HiSeq 4000",
"umi_barcode": {
"read": "Read 1",
"offset": 16,
"size": 10
}
}
},
"core": {
"type": "assay_bundle",
"schema_url": "https://raw.githubusercontent.com/HumanCellAtlas/metadata-schema/4.6.1/json_schema/assay_bundle.json",
"schema_version": "4.6.1"
},
"has_output": [
"c34f9bda-1621-4596-b93f-797552368282",
"ed7d5ab4-8589-4e50-bb6c-5d4b459b183c",
"9a4a1656-faab-448e-9717-3fb16843a314",
"b7e2cfc0-8d3f-40b4-adf2-3c44112259dc",
"072461ba-e1da-40e2-aa5d-626eedad7fef",
"58ea2f4b-c4af-4b1b-8b6a-484d46d37de5"
],
"hca_ingest": {
"accession": "",
"submissionDate": "2018-01-16T16:23:53.023Z",
"lastModifiedUser": "anonymousUser",
"updateDate": "2018-01-16T16:24:04.590Z",
"document_id": "01425de2-dcd2-479c-899a-b84763767e74",
"user": "anonymousUser"
},
"has_input": "42a6269e-8bc7-47ac-806b-3a53f8ba2a6f"
}
Loading

0 comments on commit 3cbef2a

Please sign in to comment.