Skip to content

Commit

Permalink
Basic working parallel version with db backend
Browse files Browse the repository at this point in the history
  • Loading branch information
simonvh committed Nov 13, 2014
1 parent ab04c3b commit 9baed4e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 25 deletions.
6 changes: 6 additions & 0 deletions pita/db_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from sqlalchemy import create_engine, and_, event
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.inspection import inspect
from pita.exon import Exon

Base = declarative_base()

Expand Down Expand Up @@ -50,6 +51,11 @@ def to_sloc(self):
return "{}{}{}".format(
self.start, self.strand, self.end
)

def to_flat_exon(self):
e = Exon(self.chrom, self.start, self.end, self.strand)
e.seq = self.seq
return e

class Evidence(Base):
__tablename__ = "evidence"
Expand Down
3 changes: 2 additions & 1 deletion pita/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def get_chrom_models(chrom, anno_files, data, weight, prune=None, index=None):
#

logger.info("Done calling transcripts for {0}".format(chrom))
return [v for m,v in models.items() if not m in discard]
result = [v for m,v in models.items() if not m in discard]
return [[name, [e.to_flat_exon() for e in exons]] for name, exons in result]

except:
logger.exception("Error on {0}".format(chrom))
Expand Down
47 changes: 24 additions & 23 deletions scripts/pita
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ p.add_argument("-d",

args = p.parse_args()
configfile = args.configfile

if not os.path.exists(configfile):
print "Missing config file {}".format(configfile)
print
p.print_help()
sys.exit()

threads = args.threads
index = args.index_dir
debug_level = args.debug_level.upper()
Expand Down Expand Up @@ -182,7 +189,7 @@ if config.has_key("chromosomes") and config["chromosomes"]:
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)

def print_output(alog, genename, exons, lock=None):
def print_output(genename, exons, lock=None):
print model_to_bed(exons, genename)
#alog.log_to_file(genename, exons)

Expand All @@ -196,62 +203,56 @@ def print_output(alog, genename, exons, lock=None):
if lock:
lock.release()

def listener(q, names, append=False, lock=None):
def listener(q, lock=None):
'''listens for messages on the q, writes to file. '''

alog = AnnotationLog(append)
for name in names:
alog.add(name)

while 1:
m = q.get()
if m == 'kill':
break

genename, exons = m
logger.debug("calling print_output for {0}".format(genename))
print_output(alog, genename, exons, lock)

def put_queue(q, data):
q.put(data)
print_output(genename, exons, lock)

def annotate_chrom(chrom, q, anno_files, data, weight, prune, index):
logger.info("Chromosome {0} started".format(chrom))
for genename, best_exons in get_chrom_models(chrom, anno_files, data, weight, prune, index):
pass
#results.append([genename, best_exons])
logger.debug("Putting {0} in print queue".format(genename))
put_queue(q, [genename, best_exons])
q.put([genename, best_exons])

logger.info("Chromosome {0} finished".format(chrom))

if threads > 1:
logger.info("Starting threaded work")
manager = mp.Manager()
lock = manager.Lock()
q = manager.Queue()

q = manager.Queue()
pool = mp.Pool(threads, init_worker)

try:
partialAnnotate = partial(annotate_chrom, q=q, anno_files=anno_files, data=data, weight=weight, prune=prune, index=index)
try:

#put listener to work first
watcher = pool.apply_async(listener, (q,[x[0] for x in anno_files], False, lock))
watcher = pool.apply_async(listener, args=(q, lock) )

# do the main work
partialAnnotate = partial(annotate_chrom, q=q, anno_files=anno_files, data=data, weight=weight, prune=prune, index=index)
pool.map(partialAnnotate, chroms)

# kill the queue!
q.put('kill')
pool.close()
pool.join()

except KeyboardInterrupt:
logger.exception("Caught KeyboardInterrupt, terminating workers")
pool.terminate()
pool.join()
else:
alog = AnnotationLog(append=False)
for name in [x[0] for x in anno_files]:
alog.add(name)

for chrom in chroms:
for genename,best_exons in get_chrom_models(chrom, anno_files, data, weight, prune, index):
print_output(alog, genename,best_exons)
print_output(genename,best_exons)

cdna_fh.close()
protein_fh.close()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools.command.test import test as TestCommand
import sys

VERSION = "1.7"
VERSION = "1.71"
DESCRIPTION = """
pita - pita improves transcript annotation
"""
Expand Down

0 comments on commit 9baed4e

Please sign in to comment.