Skip to content

Commit

Permalink
Use multithreading for launch external processes
Browse files Browse the repository at this point in the history
  • Loading branch information
davidemms committed Apr 22, 2023
1 parent 5700867 commit b0cd48e
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 59 deletions.
18 changes: 6 additions & 12 deletions scripts_of/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from __future__ import absolute_import

from . import parallel_task_manager
ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton()
# ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton()

import os # Y
os.environ["OPENBLAS_NUM_THREADS"] = "1" # fix issue with numpy/openblas. Will mean that single threaded options aren't automatically parallelised
Expand Down Expand Up @@ -919,16 +919,10 @@ def RunSearch(options, speciessInfoObj, seqsInfo, prog_caller, q_new_species_una
print(command)
util.Success()
print("Using %d thread(s)" % options.nBlast)
util.PrintTime("This may take some time....")
cmd_queue = mp.Queue()
for iCmd, cmd in enumerate(commands):
cmd_queue.put((iCmd+1, cmd))
runningProcesses = [mp.Process(target=parallel_task_manager.Worker_RunCommand, args=(cmd_queue, options.nBlast, len(commands), True)) for i_ in range(options.nBlast)]
for proc in runningProcesses:
proc.start()
for proc in runningProcesses:
while proc.is_alive():
proc.join()
util.PrintTime("This may take some time....")
program_caller.RunParallelCommands(options.nBlast, commands, qListOfList=False,
q_print_on_error=True, q_always_print_stderr=False)

# remove BLAST databases
util.PrintTime("Done all-versus-all sequence search")
if options.search_program == "blast":
Expand Down Expand Up @@ -1258,7 +1252,7 @@ def main(args=None):
if args is None:
args = sys.argv[1:]
# Create PTM right at start
ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton()
# ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton()
print("")
print(("OrthoFinder version %s Copyright (C) 2014 David Emms\n" % util.version))
prog_caller = GetProgramCaller()
Expand Down
2 changes: 1 addition & 1 deletion scripts_of/accelerate.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def create_profiles_database(din, wd_list, nSpAll, selection="kmeans", n_for_pro
seq_convert[ss] = og_id_full + "_" + ss
print("")
fw.WriteSeqsToFasta_withNewAccessions(seq_write, fn_fasta, seq_convert)
parallel_task_manager.RunCommand_Simple(" ".join(["diamond", "makedb", "--in", fn_fasta, "-d", fn_diamond_db]))
parallel_task_manager.RunCommand(["diamond", "makedb", "--in", fn_fasta, "-d", fn_diamond_db], qPrintOnError=True)
return fn_diamond_db


Expand Down
3 changes: 2 additions & 1 deletion scripts_of/orthologues.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from . import stag
from . import files
from . import parallel_task_manager
from . import program_caller

nThreads = util.nThreadsDefault

Expand Down Expand Up @@ -490,7 +491,7 @@ def RunAnalysis(self, qSpeciesTree=True):
cmds_trees = [[cmd_spTree]] + cmds_trees
del ogMatrices
util.PrintUnderline("Inferring gene and species trees" if qSpeciesTree else "Inferring gene trees")
parallel_task_manager.RunParallelOrderedCommandLists(self.nProcess_std, cmds_trees)
program_caller.RunParallelCommands(self.nProcess_std, cmds_trees, qListOfList=True)
if qSTAG:
# Trees must have been completed
print("")
Expand Down
56 changes: 28 additions & 28 deletions scripts_of/parallel_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def Worker_RunCommand(cmd_queue, nProcesses, nToDo, qPrintOnError=False, qPrintS
return

q_print_first_traceback_0 = False
def Worker_RunCommands_And_Move(cmd_and_filename_queue, nProcesses, nToDo, qListOfLists):
def Worker_RunCommands_And_Move(cmd_and_filename_queue, nProcesses, nToDo, qListOfLists, q_print_on_error, q_always_print_stderr):
"""
Continuously takes commands that need to be run from the cmd_and_filename_queue until the queue is empty. If required, moves
the output filename produced by the cmd to a specified filename. The elements of the queue can be single cmd_filename tuples
Expand All @@ -209,11 +209,11 @@ def Worker_RunCommands_And_Move(cmd_and_filename_queue, nProcesses, nToDo, qList
command_fns_list = [command_fns_list]
for command, fns in command_fns_list:
if isinstance(command, types.FunctionType):
# This will block the process, but it is ok for trimming, it takes minimal time
fn = command
fn(fns)
else:
popen = subprocess.Popen(command, env=my_env, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
popen.communicate()
RunCommand(command, qPrintOnError=q_print_on_error, qPrintStderr=q_always_print_stderr)
if fns != None:
actual, target = fns
if os.path.exists(actual):
Expand Down Expand Up @@ -246,15 +246,7 @@ def Worker_RunOrderedCommandList(cmd_queue, nProcesses, nToDo):
RunOrderedCommandList(commandSet)
except queue.Empty:
return

def RunParallelOrderedCommandLists(nProcesses, commands):
"""nProcesss - the number of processes to run in parallel
commands - list of lists of commands where the commands in the inner list are completed in order (the i_th won't run until
the i-1_th has finished).
"""
ptm = ParallelTaskManager_singleton()
ptm.RunParallel(commands, True, nProcesses)


q_print_first_traceback_1 = False
def Worker_RunMethod(Function, args_queue):
while True:
Expand Down Expand Up @@ -304,28 +296,28 @@ def _I_Spawn_Processes(message_to_spawner, message_to_PTM, cmds_queue):
A process should be started as early as possible (while RAM usage is low) with this method as its target.
This is now a separate process with low RAM usage.
Each time some parallel work is required then the queue for that is placed in the message_queue by the PTM.
_I_Spawn_Processes - will spawn parallel processes when instructed by the message_queue in the message_queue and get them
_I_Spawn_Processes - will spawn parallel processes when instructed by the message_queue in the message_queue and get them
working on the queue. When the queue is empty it will wait for the next one. It can receive a special signal to exit - the None
object
"""
while True:
try:
# peak in qoq - it is the only mehtod that tried to remove things from the queue
message = message_to_spawner.get(timeout=.1)
if message == None:
if message == None:
return
# In which case, thread has been informed that there are tasks in the queue.
nParallel, nTasks, qListOfLists = message
if qListOfLists:
runningProcesses = [mp.Process(target=Worker_RunOrderedCommandList, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)]
runningProcesses = [mp.Process(target=Worker_RunOrderedCommandList, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)]
else:
runningProcesses = [mp.Process(target=Worker_RunCommand, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)]
runningProcesses = [mp.Process(target=Worker_RunCommand, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)]
for proc in runningProcesses:
proc.start()
for proc in runningProcesses:
while proc.is_alive():
try:
proc.join()
proc.join()
except RuntimeError:
pass
message_to_PTM.put("Done")
Expand All @@ -336,51 +328,59 @@ def _I_Spawn_Processes(message_to_spawner, message_to_PTM, cmds_queue):


class ParallelTaskManager_singleton:
"""
Creating new process requires forking parent process and can lea to very high RAM usage. One way to mitigate this is
to create the pool of processes as early in execution as possible so that the memeory footprint is low. The
ParallelTaskManager takes care of that, and can be used by calling `RunParallelOrderedCommandLists` above.
Apr 2023 Update:
When running external programs there is no need to use multiprocessing, multithreading is sufficient since new process
will be created anyway, so the SIL is no longer an issue.
"""
class __Singleton(object):
def __init__(self):
"""Implementation:
Allocate a thread that will perform all the tasks
Communicate with it using a queue.
Communicate with it using a queue.
When provided with a list of commands it should fire up some workers and get them to run the commands and then exit.
An alternative would be they should always stay alive - but then they could die for some reason? And I'd have to check how many there are.
"""
self.message_to_spawner = mp.Queue()
self.message_to_PTM = mp.Queue()
self.message_to_spawner = mp.Queue()
self.message_to_PTM = mp.Queue()
# Orders/Messages:
# None (PTM -> spawn_thread) - thread should return (i.e. exit)
# 'Done' (spawn_thread -> PTM) - the cmds from the cmd queue have completed
# Anything else = (nParallel, nTasks) (PTM -> spawn_thread) - cmds (nTasks of them) have been placed in the cmd queue,
# Anything else = (nParallel, nTasks) (PTM -> spawn_thread) - cmds (nTasks of them) have been placed in the cmd queue,
# they should be executed using nParallel threads
self.cmds_queue = mp.Queue()
self.manager_process = mp.Process(target=_I_Spawn_Processes, args=(self.message_to_spawner, self.message_to_PTM, self.cmds_queue))
self.manager_process.start()
instance = None

def __init__(self):
if not ParallelTaskManager_singleton.instance:
ParallelTaskManager_singleton.instance = ParallelTaskManager_singleton.__Singleton()

def RunParallel(self, cmd_list, qListOfLists, nParallel):
"""
Args:
cmd_list - list of commands or list of lists of commands (in which elements in inner list must be run in order)
qListOfLists - is cmd_lists a list of lists
nParallel - number of parallel threads to use
qShell - should the tasks be run in a shell
"""
"""
nTasks = len(cmd_list)
for i, x in enumerate(cmd_list):
self.instance.cmds_queue.put((i, x))
self.instance.message_to_spawner.put((nParallel, nTasks, qListOfLists))
while True:
try:
signal = self.instance.message_to_PTM.get()
if signal == "Done":
return
if signal == "Done":
return
except queue.Empty:
pass
time.sleep(1)

def Stop(self):
"""Warning, cannot be restarted"""
self.instance.message_to_spawner.put(None)
Expand All @@ -389,7 +389,7 @@ def Stop(self):

def Success():
ptm = ParallelTaskManager_singleton()
ptm.Stop()
ptm.Stop()
sys.exit()

def Fail():
Expand Down
36 changes: 24 additions & 12 deletions scripts_of/program_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#
# For any enquiries send an email to David Emms
# david_emms@hotmail.comhor: david

import concurrent.futures
import os
import sys
import json
Expand Down Expand Up @@ -357,7 +357,17 @@ def PrintDependencyCheckFailure(cmd):

# ========================================================================================================================

def RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_filenames, qListOfList):
def RunParallelCommands(nProcesses, commands, qListOfList, q_print_on_error=False, q_always_print_stderr=False):
if qListOfList:
commands_and_no_filenames = [[(cmd, None) for cmd in cmd_list] for cmd_list in commands]
else:
commands_and_no_filenames = [(cmd, None) for cmd in commands]
RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_no_filenames, qListOfList, q_print_on_error,
q_always_print_stderr)


def RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_filenames, qListOfList, q_print_on_error=False,
q_always_print_stderr=False):
"""
Calls the commands in parallel and if required moves the results file to the required new filename
Args:
Expand All @@ -370,18 +380,20 @@ def RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_filenames, qL
qListOfList - if False then commands_and_filenames is a list of (cmd, actual_target_fn) tuples
if True then commands_and_filenames is a list of lists of (cmd, actual_target_fn) tuples where the elements
of the inner list need to be run in the order they appear.
q_print_on_error - If error code returend print stdout & stederr
"""
# Setup the workers and run
cmd_queue = mp.Queue()
i = -1
for i, cmd in enumerate(commands_and_filenames):
cmd_queue.put((i, cmd))
runningProcesses = [mp.Process(target=parallel_task_manager.Worker_RunCommands_And_Move, args=(cmd_queue, nProcesses, i+1, qListOfList)) for _ in range(nProcesses)]
for proc in runningProcesses:
proc.start()

for proc in runningProcesses:
while proc.is_alive():
proc.join(10.)
time.sleep(2)


with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(parallel_task_manager.Worker_RunCommands_And_Move,
cmd_queue,
nProcesses,
i+1,
qListOfList,
q_print_on_error,
q_always_print_stderr=q_always_print_stderr)
for _ in range(nProcesses)]
concurrent.futures.wait(futures)
5 changes: 3 additions & 2 deletions scripts_of/trees2ologs_dlcpar.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from . import tree
from . import util
from . import files
from . import parallel_task_manager
from . import program_caller

def natural_sort_key(s, _nsre=re.compile('([0-9]+)')):
return [int(text) if text.isdigit() else text.lower() for text in re.split(_nsre, s)]
Expand Down Expand Up @@ -133,7 +133,8 @@ def RunDlcpar(ogSet, speciesTreeFN, workingDir, nParallel, qDeepSearch):
dlcCommands = ['dlcpar_search -s %s -S %s -D 1 -C 0.125 %s -I .txt -i %d --nprescreen 100 --nconverge %d' % (speciesTreeFN, geneMapFN, fn, i, n) for (fn, i, n) in zip(filenames, nIter, nNoImprov)]
else:
dlcCommands = ['dlcpar_search -s %s -S %s -D 1 -C 0.125 %s -I .txt -x 1' % (speciesTreeFN, geneMapFN, fn) for fn in filenames]
parallel_task_manager.RunParallelOrderedCommandLists(nParallel, [[c] for c in dlcCommands])
program_caller.RunParallelCommands(nParallel, [[c] for c in dlcCommands], qListOfList=True)

return dlcparResultsDir, "OG%07d_tree_id.dlcpar.locus.tree"


Expand Down
2 changes: 1 addition & 1 deletion scripts_of/trees_msa.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def CreateConcatenatedAlignment(ogsToUse_ids, ogs, alignment_filename_function,
outfile.write("".join(seq[i:i+nChar]) + "\n")

def trim_fn(fn):
trim.main(fn, fn, 0.1, 500, 0.75)
trim.main(fn, fn, 0.1, 500, 0.75, False)

"""
-----------------------------------------------------------------------------
Expand Down
15 changes: 13 additions & 2 deletions scripts_of/trim.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import shutil
import argparse
import itertools
import subprocess
import numpy as np
import scipy.sparse

Expand Down Expand Up @@ -61,7 +62,18 @@ def write_msa(self, i_cols, outfn, nChar = 80):
for i in range(0, len(seq), nChar):
outfile.write(seq[i:i+nChar] + "\n")

def main(infn, outfn, f=0.1, n_min=500, c=0.75):
def main(infn, outfn, f=0.1, n_min=500, c=0.75, exe=False):
if exe:
run_exe(infn, outfn, f, n_min, c)
else:
run_in_process(infn, outfn, f, n_min, c)

def run_exe(infn, outfn, f, n_min, c):
"""run trim in a separate process not forked from current process"""
subprocess.run(["python", __file__, "-c", str(c), infn, outfn, str(f), str(n_min)])


def run_in_process(infn, outfn, f, n_min, c):
"""
Trim the alignment file. Auto reduce f parameter if required. OrthoFinder default
parameters are used by default.
Expand Down Expand Up @@ -124,7 +136,6 @@ def main(infn, outfn, f=0.1, n_min=500, c=0.75):
# write_msa(M, names, outfn)
# print("%0.3f: %d->%d, %0.1f%% characters retained. Trimmed %s" % (f, length, i_keep[0].size, 100.*aa_after/aa_before, infn))


def get_satifactory_f(gap_counts, aa_counts, N, f_orig, n_min, c, tol = 0.001):
"""
The f used was too large, get an f that gives n_min columns
Expand Down

0 comments on commit b0cd48e

Please sign in to comment.