Skip to content

Commit

Permalink
moving tests to seqio
Browse files Browse the repository at this point in the history
  • Loading branch information
CNuge committed Mar 10, 2020
1 parent 1ec2458 commit 8285a08
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 181 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified alfie/__pycache__/training.cpython-37.pyc
Binary file not shown.
40 changes: 13 additions & 27 deletions alfie/test_kmerseq.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,23 @@
import pytest
from alfie.kmerseq import KmerFeatures

import unittest

from kmerseq import KmerFeatures

class KmerTests(unittest.TestCase):
def test_KmerFeatures():
"""Unit tests for the KmerFeatures class."""
@classmethod
def setUpClass(self):
"""Initiate the test class instance."""
self.test_kmers = KmerFeatures("test1",
"aaaaaattttttatatatgcgcgccccccgccgcgccgggc")

def test_KmerFeatures(self):

self.assertEqual(self.test_kmers.name,
"test1")
test_kmers = KmerFeatures("test1",
"aaaaaattttttatatatgcgcgccccccgccgcgccgggc")

assert test_kmers.name == "test1"

self.assertEqual(self.test_kmers.labels.shape,
(256,))
assert test_kmers.labels.shape == (256,)

self.assertEqual(list(self.test_kmers.labels[:3]),
['AAAA', 'AAAC', 'AAAG'])
assert list(test_kmers.labels[:3]) == ['AAAA', 'AAAC', 'AAAG']

self.assertEqual(list(self.test_kmers.labels[-3:]),
['TTTC', 'TTTG', 'TTTT'])
assert list(test_kmers.labels[-3:]) == ['TTTC', 'TTTG', 'TTTT']

self.assertEqual(self.test_kmers.kmer_freqs.shape,
(256,))
assert test_kmers.kmer_freqs.shape == (256,)

with self.assertRaises(ValueError):
self.assertEqual(KmerFeatures("test1", "NOTDNA"))
with pytest.raises(ValueError):
KmerFeatures("test1", "NOTDNA")


if __name__ == '__main__':
unittest.main()

151 changes: 68 additions & 83 deletions alfie/test_seqio.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,84 @@

import os
import unittest
#import os

from seqio import file_type, outfile_dict, read_fasta, read_fastq
import pytest

from alfie.seqio import file_type, outfile_dict, read_fasta, read_fastq
from alfie import ex_fasta_file, ex_fastq_file

class SeqioTests(unittest.TestCase):
"""Unit tests for the seqio functions"""
@classmethod
def setUpClass(self):
"""Initiate the test class instance."""
self._expected_kingdom_dict = {0: 'alfie_out/animalia_test.fasta',
1: 'alfie_out/bacteria_test.fasta',
2: 'alfie_out/fungi_test.fasta',
3: 'alfie_out/plantae_test.fasta',
4: 'alfie_out/protista_test.fasta'}

self._fasta_infile = ex_fasta_file
self._fastq_infile = ex_fastq_file

@classmethod
"""
#TODO - unit tests for write - see if buffer or make and destroy files is best practice
#when you add the write tests, do this in pytest
def tearDown(self):
"""After unit tests, remove the temporary outputs."""
#After unit tests, remove the temporary outputs.
try:
os.rmdir("alfie_out")
except OSError:
pass
"""

def test_file_type():
"""Test that the file type is properly identified."""
assert file_type("file_1.fa") == "fasta"
assert file_type("file_1.fasta") == "fasta"
assert file_type("in.file_1.fa") == "fasta"
assert file_type("file_2.fq") == "fastq"
assert file_type("file_2.fastq") == "fastq"
assert file_type("in.file_2.fq") == "fastq"

with pytest.raises(ValueError):
file_type("infile_2.txt")

with pytest.raises(ValueError):
file_type("in.file_2.csv")


def test_outfile_builder():
"""Test that the output file set is generated properly."""
expected_kingdom_dict1 = {0: 'alfie_out/animalia_test.fasta',
1: 'alfie_out/bacteria_test.fasta',
2: 'alfie_out/fungi_test.fasta',
3: 'alfie_out/plantae_test.fasta',
4: 'alfie_out/protista_test.fasta'}

expected_kingdom_dict2 = {0: 'diff_place/animalia_test.fastq',
1: 'diff_place/bacteria_test.fastq',
2: 'diff_place/fungi_test.fastq',
3: 'diff_place/plantae_test.fastq',
4: 'diff_place/protista_test.fastq'}

def test_file_type(self):
"""Test that the file type is properly identified."""
self.assertEqual(file_type("file_1.fa"),
"fasta")
self.assertEqual(file_type("file_1.fasta"),
"fasta")
self.assertEqual(file_type("in.file_1.fa"),
"fasta")
self.assertEqual(file_type("file_2.fq"),
"fastq")
self.assertEqual(file_type("file_2.fastq"),
"fastq")
self.assertEqual(file_type("in.file_2.fq"),
"fastq")

with self.assertRaises(ValueError):
self.assertEqual(file_type("infile_2.txt"))
out1 = outfile_dict("test.fasta")
assert out1 == expected_kingdom_dict1

out2 = outfile_dict("in_data/test.fastq", folder_prefix = 'diff_place/')
assert out2 == expected_kingdom_dict2


def test_fasta_reader():
""" Test the fasta reader functions."""
fasta_read = read_fasta(ex_fasta_file)

with self.assertRaises(ValueError):
self.assertEqual(file_type("in.file_2.csv"))

def test_outfile_builder(self):
"""Test that the output file set is generated properly."""
self.assertEqual(outfile_dict("test.fasta"),
self._expected_kingdom_dict)

self.assertEqual(outfile_dict("in_data/test.fasta"),
self._expected_kingdom_dict)

def test_fasta_reader(self):
""" Test the fasta reader functions."""
self._fasta_read = read_fasta(self._fasta_infile)

self.assertEqual(len(self._fasta_read), 100)

self.assertEqual(self._fasta_read[0]['name'],
"seq1_plantae")
self.assertEqual(self._fasta_read[1]['name'],
"seq2_bacteria")
self.assertEqual(self._fasta_read[2]['name'],
"seq3_protista")

self.assertEqual(self._fasta_read[0]['sequence'][:25],
"TTCTAGGAGCATGTATATCTATGCT")
self.assertEqual(self._fasta_read[1]['sequence'][:25],
"ACGGGCTTATCATGGTATTTGGTGC")
self.assertEqual(self._fasta_read[2]['sequence'][:25],
"AGTATTAATTCGTATGGAATTAGCA")

def test_fastq_reader(self):
""" Test the fastq reader functions."""
self._fastq_read = read_fastq(self._fastq_infile)

self.assertEqual(len(self._fastq_read), 100)
assert len(fasta_read) == 100

for i in range(len(self._fastq_read)):
self.assertEqual(list(self._fastq_read[i].keys()),
['name', 'sequence', 'strand', 'quality'])
assert fasta_read[0]['name'] == "seq1_plantae"
assert fasta_read[1]['name'] == "seq2_bacteria"
assert fasta_read[2]['name'] == "seq3_protista"

assert fasta_read[0]['sequence'][:25] == "TTCTAGGAGCATGTATATCTATGCT"
assert fasta_read[1]['sequence'][:25] == "ACGGGCTTATCATGGTATTTGGTGC"
assert fasta_read[2]['sequence'][:25] == "AGTATTAATTCGTATGGAATTAGCA"


def test_fastq_reader():
""" Test the fastq reader functions."""
fastq_read = read_fastq(ex_fastq_file)

self.assertEqual(self._fastq_read[0]['sequence'][:25],
"ttctaggagcatgtatatctatgct")
self.assertEqual(self._fastq_read[1]['sequence'][:25],
"acgggcttatcatggtatttggtgc")
self.assertEqual(self._fastq_read[2]['sequence'][:25],
"agtattaattcgtatggaattagca")
assert len(fastq_read) == 100

for i in range(len(fastq_read)):
assert list(fastq_read[i].keys()) == ['name', 'sequence', 'strand', 'quality']

if __name__ == '__main__':
unittest.main()
assert fastq_read[0]['sequence'][:25] == "ttctaggagcatgtatatctatgct"
assert fastq_read[1]['sequence'][:25] == "acgggcttatcatggtatttggtgc"
assert fastq_read[2]['sequence'][:25] == "agtattaattcgtatggaattagca"

140 changes: 76 additions & 64 deletions alfie/test_training.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,99 @@
import unittest
"""Unit tests for the module: alfie.training """

import training
import pytest
import alfie.training as training
import numpy as np
import pandas as pd

class TrainingTests(unittest.TestCase):
#NOTE : I'm trying this in pytest as opposed to the unittest module, will see how it goes.

def test_split(self):
def test_split():
"""Tests for the stratified_taxon_split function."""
data = pd.DataFrame({"phylum" : ["Mollusca"]*10 + ["Arthropoda"] * 15,
"data_col" : [np.random.randint(100) for x in range(25)]})
#split on the column phylum, contians the classifications
train, test = training.stratified_taxon_split(data, class_col = "phylum",
test_size = .2, silent = True, seed = 1738)
# 80% of data in train
assert train.shape == (20, 2)
# index order is randomized
assert list(train.index) == [16, 13, 0, 17, 5, 3, 10,
9, 18, 24, 23, 14, 2, 1,
20, 12, 19, 6, 4, 22]

data = pd.DataFrame({"phylum" : ["Mollusca"]*10 + ["Arthropoda"] * 15,
"data_col" : [np.random.randint(100) for x in range(25)]})
#split on the column phylum, contians the classifications
train, test = stratified_taxon_split(data, class_col = "phylum",
test_size = .2, silent = True)
# 80% of data in train
train.shape
# index order is randomized
train.index
test.shape
assert test.shape == (5, 2)
assert list(test.index) == [15, 21, 7, 11, 8]


def test_sample_sequences(self):
in_seq = "AAAAAAAAAATTTTTTTTTTGGGGGGGGGGCCCCCCCCCCAAAAAAAAAATTTTTTTTTTGGGGGGGGGG"

sample_seq(in_seq, min_size = 25, max_size = 70, seed = 1738)
['GGGCCCCCCCCCCAAAAAAAAAATTTTTTT']
def test_sample_seq():

sample_seq(in_seq, min_size = 25, max_size = 70, n = 2, seed = 1738)
['ATTTTTTTTTTGGGGGGGGGGCCCCCCCCC',
'TTTTTTTTGGGGGGGGGGCCCCCCCCCCAAAAAAAAAATTTTTTTTTTGGGG']
in_seq = "AAAAAAAAAATTTTTTTTTTGGGGGGGGGGCCCCCCCCCCAAAAAAAAAATTTTTTTTTTGGGGGGGGGG"

out1 = training.sample_seq(in_seq, min_size = 25, max_size = 70, seed = 1738)
expected1 = ['GGGCCCCCCCCCCAAAAAAAAAATTTTTTT']

assert out1 == expected1

def test_process_sequences(self):
out2 = training.sample_seq(in_seq, min_size = 25, max_size = 70, n = 2, seed = 1738)
expected2 = ['ATTTTTTTTTTGGGGGGGGGGCCCCCCCCC',
'TTTTTTTTGGGGGGGGGGCCCCCCCCCCAAAAAAAAAATTTTTTTTTTGGGG']

ex_dat = pd.DataFrame({"processid" : ["ex1", "ex2", "ex3", "ex4", "ex5",],
"sequence" : ["AAAAAG" * 50 , "AAATAA" * 50, "AAGAAA" * 50, "TTTTAT" * 50, "TCTTCT" * 50],
"kingdom" : ["animalia", "bacteria", "fungi", "plantae", "protista"]})
assert out2 == expected2

#process the example data with defaults
out_dat = process_sequences(ex_dat)

#dict with 4 equal lenght lists
out_dat.keys()
dict_keys(['ids', 'labels', 'data', 'seq'])
len(out_dat['ids']) == len(ex_dat['processid'])
def test_process_sequences():

#different size k, turn off the subsampling, output a dataframe
out_dat2 = process_sequences(ex_dat, k = 2,
to_dataframe = True,
subsample = False)
ex_dat = pd.DataFrame({"processid" : ["ex1", "ex2", "ex3", "ex4", "ex5",],
"sequence" : ["AAAAAG" * 50 , "AAATAA" * 50, "AAGAAA" * 50, "TTTTAT" * 50, "TCTTCT" * 50],
"kingdom" : ["animalia", "bacteria", "fungi", "plantae", "protista"]})

out_dat2.columns
Index(['ids', 'labels', 'data', 'seq'], dtype='object')
#process the example data with defaults
out_dat = training.process_sequences(ex_dat)

def test_shuffle_unison(self):
#dict with 4 equal lenght lists
assert list(out_dat.keys()) == ['ids', 'labels', 'data', 'seq']
assert len(out_dat['ids']) == len(ex_dat['processid'])

x = np.array([[1,2],
[3,4],
[5,6],
[7,8]])
y = np.array([[1,2],
[3,4],
[5,6],
[7,8]])
#different size k, turn off the subsampling, output a dataframe
out_dat2 = training.process_sequences(ex_dat, k = 2,
to_dataframe = True,
subsample = False)

new_x, new_y = shuffle_unison(x, y, seed = 1738)
#query dataframe properties
assert list(out_dat2.columns) ==['ids', 'labels', 'data', 'seq']
assert np.all(out_dat2.ids == ex_dat.processid)
assert out_dat2['data'][0].shape == (16,)

#is x the same as before shuffle_unison?
np.all(new_x == x)
False
#have x and y been shuffled in unison?
np.all(new_x == new_y)


def test_nn_constriction(self):
def test_shuffle_unison():

dnn_1mer = training.alfie_dnn_default()
model1 = alfie_dnn_default(hidden_sizes = [10,4], in_shape = 4, n_classes = 2)

model1.input.shape
TensorShape([None, 4])

model1.output.shape
TensorShape([None, 2])

model1.trainable
x = np.array([[1,2],
[3,4],
[5,6],
[7,8]])
y = np.array([[1,2],
[3,4],
[5,6],
[7,8]])

new_x, new_y = training.shuffle_unison(x, y, seed = 1738)

#is x the same as before shuffle_unison?
assert np.all(new_x == x) == False
#have x and y been shuffled in unison?
assert np.all(new_x == new_y)

with pytest.raises(ValueError):
training.shuffle_unison(x, np.array([[1,1],[1,2]]), seed = 1738)

def test_alfie_dnn_default():

model1 = training.alfie_dnn_default(hidden_sizes = [10,4], in_shape = 4, n_classes = 2)

assert list(model1.input.shape) == [None, 4]

assert list(model1.output.shape) == [None, 2]

assert model1.trainable

Loading

0 comments on commit 8285a08

Please sign in to comment.