Skip to content

Commit

Permalink
almost at a asyncio implementation for stats
Browse files Browse the repository at this point in the history
  • Loading branch information
schae234 committed Jun 23, 2018
1 parent bfca289 commit e1249fe
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 49 deletions.
117 changes: 83 additions & 34 deletions SRA_Tinder/CLI/sra_tinder
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
import argparse
import SRA_Tinder
import concurrent.futures
import multiprocessing

from SRA_Tinder import sra_tinder
from SRA_Tinder.sra_stream import SRA_Stream
from functools import partial

import SRA_Tinder.trimandcount as trimandcount
import subprocess
import sys
import os
import pandas
import asyncio


def generate_acc_statistics(fastqfile):
async def generate_acc_statistics(fastqfile,event=None):
'''
Generates statistics from a fastq file generated from
an SRA accession
Expand All @@ -26,65 +29,97 @@ def generate_acc_statistics(fastqfile):
Returns
-------
a pd dataframe containing statistcs
a pandas dataframe containing statistcs
'''
print(f'Generating stats for {fastqfile}')
titleline = [
"Accession", "mean_quality_score", "most_abundent_organism",
"percent_abundence", "number_of_organims_greater_than_1%_abundence",
"total_reads_checked", "total_reads_withadapter", "mean_readlen_before_trim", "std_readlen_before_trim",
"mean_readlen_of_trimmed_reads", "std_readlen_of_trimmed_reads"
]
accession = fastqfile.replace('.fastq','')
# Get some data about the SRA ACC from the web
my_tinder = sra_tinder.sra_tinder_web(accession)
i = my_tinder.scrape_qc()
iii = my_tinder.scrape_organisms()
# Get some data from trimmed data
print (fastqfile)
totalreads, withadapter, mean_readlen, \
std_readlen, readlen_trimmed, \
std_readlen_trimmed = trimandcount.basesleftaftertriming(fastqfile)
# Generate a list of the info
listofinfo = [accession, str(i), iii[0], iii[1], iii[2], totalreads, withadapter, mean_readlen, std_readlen, readlen_trimmed, std_readlen_trimmed]
# Generate the resultant Pandas frame and create output
df = pandas.DataFrame.from_records(listofinfo, columns=titleline)
return listofinfo
# Wait for the pipe to start to be filled
if event is not None:
await event.wait()
def do_stats(fastqfile):
print(f'Generating stats for {fastqfile}')
with open(fastqfile) as IN:
i = 0
for _ in IN:
i+=1
if i % 1000:
print(f'counted {i} lines for {fastqfile}')
print(i)
return
# Do stuff
titleline = [
"Accession", "mean_quality_score", "most_abundent_organism",
"percent_abundence", "number_of_organims_greater_than_1%_abundence",
"total_reads_checked", "total_reads_withadapter", "mean_readlen_before_trim", "std_readlen_before_trim",
"mean_readlen_of_trimmed_reads", "std_readlen_of_trimmed_reads"
]
accession = fastqfile.replace('.fastq','')
# Get some data about the SRA ACC from the web
my_tinder = sra_tinder.sra_tinder_web(accession)
i = my_tinder.scrape_qc()
iii = my_tinder.scrape_organisms()
# Get some data from trimmed data
print (fastqfile)
totalreads, withadapter, mean_readlen, \
std_readlen, readlen_trimmed, \
std_readlen_trimmed = trimandcount.basesleftaftertriming(fastqfile)
# Generate a list of the info
listofinfo = [accession, str(i), iii[0], iii[1], iii[2], totalreads, withadapter, mean_readlen, std_readlen, readlen_trimmed, std_readlen_trimmed]
# Generate the resultant Pandas frame and create output
df = pandas.DataFrame.from_records(listofinfo, columns=titleline)
return listofinfo
await asyncio.get_event_loop().run_in_executor(None,do_stats,fastqfile)


# Event Loops
def run_matching_event_loop(args):
streamer = SRA_Stream()
# Create some base objects
loop = asyncio.get_event_loop()
# Create a process pool
pool = concurrent.futures.ProcessPoolExecutor(max_workers=4)
pool = concurrent.futures.ProcessPoolExecutor(max_workers=100)
# Create a task list
tasks = []
streamer = SRA_Stream()
# Process Input File
with open(args.input) as IN:
for acc in IN:
# Each line is an SRR accession
acc = acc.strip()
# Create a condition
event = asyncio.Event()
# Create a task for streaming
future = streamer.stream(acc,pool=pool)
tasks.append(future)
# Create a task for doing stats
stream = streamer.stream(acc,event)
pipe_path = streamer.get_pipe(acc)
future = loop.run_in_executor(pool, generate_acc_statistics, pipe_path)
tasks.append(future)
stats = generate_acc_statistics(pipe_path,event)
# Append tasks
tasks.append(stream)
tasks.append(stats)
results = asyncio.gather(*tasks)
loop.run_until_complete(results)
return results

def run_stream_event_loop(args):
loop = asyncio.get_event_loop()
streamer = SRA_Stream()
tasks = []
accs = [acc.strip() for acc in open(args.input)]
pool = concurrent.futures.ProcessPoolExecutor(max_workers=100)
for acc in accs:
tasks.append(streamer.stream(acc,pool=pool))
results = asyncio.gather(*tasks)
loop.run_until_complete(results)
return results



# Main logic to parse args and run the correct event loop
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=(
"Find hot datasets in your area (of research)!"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="\n".join([
f'version: {SRA_Tinder.__version__}'
f'version: {SRA_Tinder.__version__}',
f'install path: {__file__}'
])
)
parser.set_defaults(func=lambda x: parser.print_help())
Expand All @@ -94,7 +129,21 @@ if __name__ == '__main__':
description='Use --help with each command for more info'
)

# basic functions go here
# --------------------------------------------------------------
# Streaming function --
# Open a bunch of streamin pipes to SRA files
stream = subparsers.add_parser(
'stream',
help='Stream SRA FASTQ files into named pipes'
)
stream.add_argument(
'--input',
help='input file, one SRR per line'
)
stream.set_defaults(func=run_stream_event_loop)

# --------------------------------------------------------------
# Matching function
matches = subparsers.add_parser(
'match',
help='create SRA matches'
Expand Down
18 changes: 9 additions & 9 deletions SRA_Tinder/sra_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
from ngs.ReadCollection import ReadCollection
from ngs.Read import Read
from ngs.ReadIterator import ReadIterator
from functools import partial

class SRA_Stream():

def __init__(self):
pass

def stream_reads(self, acc, splitNum=1, splitNo=1):
def stream_reads(self, acc, event, splitNum=1, splitNo=1):
'''
This is a blocking task, it needs to be run in an executor
'''

# open requested accession using SRA implementation of the API
print(f'Streaming {acc}', file=sys.stderr)
pipe_path = self.get_pipe(acc)
pipe = open(pipe_path, 'w')
event.clear()
with NGS.openReadCollection(acc) as run:
run_name = run.getName()
# compute window to iterate through
Expand All @@ -48,6 +50,7 @@ def stream_reads(self, acc, splitNum=1, splitNo=1):
read = f'@{ids}\n{bases}\n+\n{qualities}'
print(read,file=pipe)
os.unlink(pipe_path)
print(f'Done streaming for {acc}')
return None

def _create_pipe(self,acc):
Expand All @@ -73,7 +76,7 @@ def get_pipe(self,acc):
acc = acc + '.fastq'
return acc

def stream(self,acc, pool=None):
async def stream(self,acc,pool=None,event=None):
'''
Stream SRA files into named pipes
Expand All @@ -85,13 +88,10 @@ def stream(self,acc, pool=None):
------
A future containing s
'''
print(f'Streaming {acc}', file=sys.stderr)
if pool is None:
pool = concurrent.futures.ProcessPoolExecutor()
self._create_pipe(acc)
loop = asyncio.get_event_loop()
future = loop.run_in_executor(pool, self.stream_reads, acc)
return future
self._create_pipe(acc)
cb = partial(self.stream_reads,acc,event=event)
await loop.run_in_executor(pool, cb)


if __name__ == '__main__':
Expand Down
6 changes: 0 additions & 6 deletions SRA_Tinder/trimandcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,6 @@ def outputtrimedadapterfastqfile(fastqfile, outfastqfile, adpater):
return trimmed_reads

def basesleftaftertriming(fastqfile):
with open(fastqfile) as IN:
i = 0
for line in IN:
i+=1
print(i)
return
m = loopadapters(fastqfile)
print(m)
totalreads, withadapter, mean_readlen, std_readlen, readlen_trimmed, std_readlen_trimmed = m[0]
Expand Down
Empty file removed test/README.txt
Empty file.

0 comments on commit e1249fe

Please sign in to comment.