|
| 1 | +from nose.tools import * |
| 2 | +import os |
| 3 | +import subprocess |
| 4 | +import urllib |
| 5 | + |
| 6 | +import taxonomy |
| 7 | +from taxonomy import Taxonomy |
| 8 | + |
| 9 | + |
| 10 | +# Download taxdmp.zip once to local directory |
| 11 | +# and uncompress if not already present |
| 12 | +def taxonomy_setup(): |
| 13 | + if not os.path.exists(os.path.join(os.getcwd(), 'sample_data/')): |
| 14 | + os.mkdir(os.path.join(os.getcwd(), 'sample_data/')) |
| 15 | + full_tax_path = os.path.join(os.getcwd(), "sample_data/taxonomy.json.gz") |
| 16 | + taxdmp_path = os.path.join(os.getcwd(), "sample_data/taxdmp.zip") |
| 17 | + taxdmp_dir = os.path.join(os.getcwd(), "sample_data/taxdmp") |
| 18 | + names_head = os.path.join(os.getcwd(), "sample_data/taxdmp/names.1000.dmp") |
| 19 | + nodes_head = os.path.join(os.getcwd(), "sample_data/taxdmp/nodes.1000.dmp") |
| 20 | + ncbi_ftp = "ftp://ftp.ncbi.nih.gov/pub/taxonomy/taxdmp.zip" |
| 21 | + if not os.path.exists(taxdmp_path): |
| 22 | + urllib.urlretrieve(ncbi_ftp, |
| 23 | + taxdmp_path) |
| 24 | + if not os.path.exists(taxdmp_dir): |
| 25 | + res0 = subprocess.call(["unzip", taxdmp_path, |
| 26 | + "-d", taxdmp_dir]) |
| 27 | + if res0 != 0: |
| 28 | + raise Exception("Failed to unzip necessary taxdmp files.") |
| 29 | + |
| 30 | + if not os.path.exists(names_head) or not os.path.exists(nodes_head): |
| 31 | + res1 = subprocess.call(["head", "-n", "1000", |
| 32 | + os.path.join(taxdmp_dir, "names.dmp")], |
| 33 | + stdout=open(names_head, mode='w')) |
| 34 | + |
| 35 | + res2 = subprocess.call(["head", "-n", "1000", |
| 36 | + os.path.join(taxdmp_dir, "nodes.dmp")], |
| 37 | + stdout=open(nodes_head, mode='w')) |
| 38 | + |
| 39 | + if res1 != 0 or res2 != 0: |
| 40 | + raise Exception("Failed to take the head of the names.dmp or " |
| 41 | + "nodes.dmp files.") |
| 42 | + |
| 43 | + if not os.path.exists(full_tax_path): |
| 44 | + tax = Taxonomy.build_from_ncbi(os.path.join(taxdmp_dir, "names.dmp"), |
| 45 | + os.path.join(taxdmp_dir, "nodes.dmp"), |
| 46 | + ncbi_ftp, ncbi_ftp, |
| 47 | + "FTP Revision - See Date") |
| 48 | + tax.save(full_tax_path) |
| 49 | + |
| 50 | + |
| 51 | +@with_setup(taxonomy_setup) |
| 52 | +def test_small_tax_create(): |
| 53 | + out_tax = os.path.join(os.getcwd(), "sample_data/small_tax.json") |
| 54 | + names_head = os.path.join(os.getcwd(), "sample_data/taxdmp/names.1000.dmp") |
| 55 | + nodes_head = os.path.join(os.getcwd(), "sample_data/taxdmp/nodes.1000.dmp") |
| 56 | + tax = Taxonomy.build_from_ncbi(names_head, nodes_head, |
| 57 | + "Test Names", "Test Nodes", |
| 58 | + "Sample NCBI Revision") |
| 59 | + |
| 60 | + assert tax.__class__ is taxonomy.taxonomy.Taxonomy |
| 61 | + print out_tax |
| 62 | + tax.save(out_tax) |
| 63 | + |
| 64 | + # Assertion on root |
| 65 | + print tax.G.node[1] |
| 66 | + assert tax.G.node[1] == {'hidden': True, 'name': 'root', |
| 67 | + 'rank': 'no rank'} |
| 68 | + |
| 69 | + # Check that files are properly output |
| 70 | + assert not os.path.exists(out_tax) |
| 71 | + assert os.path.exists(out_tax + ".gz") # auto-append works |
| 72 | + |
| 73 | + # Reread file in |
| 74 | + tax_b = Taxonomy.load(out_tax + ".gz") |
| 75 | + assert len(tax_b.G.nodes()) == len(tax.G.nodes()) |
| 76 | + assert len(tax_b.G.edges()) == len(tax_b.G.edges()) |
| 77 | + |
| 78 | + # Assert on new file |
| 79 | + assert tax_b.G.node[1] == {'hidden': True, 'name': 'root', |
| 80 | + 'rank': 'no rank'} |
| 81 | + # Final cleanup |
| 82 | + os.remove(out_tax + ".gz") |
0 commit comments