From b0cd48ec34c9eb7700dae6ffe7c927e957401733 Mon Sep 17 00:00:00 2001 From: David Emms Date: Sat, 22 Apr 2023 11:44:33 +0100 Subject: [PATCH] Use multithreading for launch external processes --- scripts_of/__main__.py | 18 ++++------ scripts_of/accelerate.py | 2 +- scripts_of/orthologues.py | 3 +- scripts_of/parallel_task_manager.py | 56 ++++++++++++++--------------- scripts_of/program_caller.py | 36 ++++++++++++------- scripts_of/trees2ologs_dlcpar.py | 5 +-- scripts_of/trees_msa.py | 2 +- scripts_of/trim.py | 15 ++++++-- 8 files changed, 78 insertions(+), 59 deletions(-) diff --git a/scripts_of/__main__.py b/scripts_of/__main__.py index da1f1944..475f4ce6 100755 --- a/scripts_of/__main__.py +++ b/scripts_of/__main__.py @@ -29,7 +29,7 @@ from __future__ import absolute_import from . import parallel_task_manager -ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton() +# ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton() import os # Y os.environ["OPENBLAS_NUM_THREADS"] = "1" # fix issue with numpy/openblas. Will mean that single threaded options aren't automatically parallelised @@ -919,16 +919,10 @@ def RunSearch(options, speciessInfoObj, seqsInfo, prog_caller, q_new_species_una print(command) util.Success() print("Using %d thread(s)" % options.nBlast) - util.PrintTime("This may take some time....") - cmd_queue = mp.Queue() - for iCmd, cmd in enumerate(commands): - cmd_queue.put((iCmd+1, cmd)) - runningProcesses = [mp.Process(target=parallel_task_manager.Worker_RunCommand, args=(cmd_queue, options.nBlast, len(commands), True)) for i_ in range(options.nBlast)] - for proc in runningProcesses: - proc.start() - for proc in runningProcesses: - while proc.is_alive(): - proc.join() + util.PrintTime("This may take some time....") + program_caller.RunParallelCommands(options.nBlast, commands, qListOfList=False, + q_print_on_error=True, q_always_print_stderr=False) + # remove BLAST databases util.PrintTime("Done all-versus-all sequence search") if options.search_program == "blast": @@ -1258,7 +1252,7 @@ def main(args=None): if args is None: args = sys.argv[1:] # Create PTM right at start - ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton() + # ptm_initialised = parallel_task_manager.ParallelTaskManager_singleton() print("") print(("OrthoFinder version %s Copyright (C) 2014 David Emms\n" % util.version)) prog_caller = GetProgramCaller() diff --git a/scripts_of/accelerate.py b/scripts_of/accelerate.py index d3edef6a..c80b3498 100644 --- a/scripts_of/accelerate.py +++ b/scripts_of/accelerate.py @@ -300,7 +300,7 @@ def create_profiles_database(din, wd_list, nSpAll, selection="kmeans", n_for_pro seq_convert[ss] = og_id_full + "_" + ss print("") fw.WriteSeqsToFasta_withNewAccessions(seq_write, fn_fasta, seq_convert) - parallel_task_manager.RunCommand_Simple(" ".join(["diamond", "makedb", "--in", fn_fasta, "-d", fn_diamond_db])) + parallel_task_manager.RunCommand(["diamond", "makedb", "--in", fn_fasta, "-d", fn_diamond_db], qPrintOnError=True) return fn_diamond_db diff --git a/scripts_of/orthologues.py b/scripts_of/orthologues.py index f1d30306..7b078a83 100644 --- a/scripts_of/orthologues.py +++ b/scripts_of/orthologues.py @@ -52,6 +52,7 @@ from . import stag from . import files from . import parallel_task_manager +from . import program_caller nThreads = util.nThreadsDefault @@ -490,7 +491,7 @@ def RunAnalysis(self, qSpeciesTree=True): cmds_trees = [[cmd_spTree]] + cmds_trees del ogMatrices util.PrintUnderline("Inferring gene and species trees" if qSpeciesTree else "Inferring gene trees") - parallel_task_manager.RunParallelOrderedCommandLists(self.nProcess_std, cmds_trees) + program_caller.RunParallelCommands(self.nProcess_std, cmds_trees, qListOfList=True) if qSTAG: # Trees must have been completed print("") diff --git a/scripts_of/parallel_task_manager.py b/scripts_of/parallel_task_manager.py index b3259d68..a03c8939 100644 --- a/scripts_of/parallel_task_manager.py +++ b/scripts_of/parallel_task_manager.py @@ -182,7 +182,7 @@ def Worker_RunCommand(cmd_queue, nProcesses, nToDo, qPrintOnError=False, qPrintS return q_print_first_traceback_0 = False -def Worker_RunCommands_And_Move(cmd_and_filename_queue, nProcesses, nToDo, qListOfLists): +def Worker_RunCommands_And_Move(cmd_and_filename_queue, nProcesses, nToDo, qListOfLists, q_print_on_error, q_always_print_stderr): """ Continuously takes commands that need to be run from the cmd_and_filename_queue until the queue is empty. If required, moves the output filename produced by the cmd to a specified filename. The elements of the queue can be single cmd_filename tuples @@ -209,11 +209,11 @@ def Worker_RunCommands_And_Move(cmd_and_filename_queue, nProcesses, nToDo, qList command_fns_list = [command_fns_list] for command, fns in command_fns_list: if isinstance(command, types.FunctionType): + # This will block the process, but it is ok for trimming, it takes minimal time fn = command fn(fns) else: - popen = subprocess.Popen(command, env=my_env, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - popen.communicate() + RunCommand(command, qPrintOnError=q_print_on_error, qPrintStderr=q_always_print_stderr) if fns != None: actual, target = fns if os.path.exists(actual): @@ -246,15 +246,7 @@ def Worker_RunOrderedCommandList(cmd_queue, nProcesses, nToDo): RunOrderedCommandList(commandSet) except queue.Empty: return - -def RunParallelOrderedCommandLists(nProcesses, commands): - """nProcesss - the number of processes to run in parallel - commands - list of lists of commands where the commands in the inner list are completed in order (the i_th won't run until - the i-1_th has finished). - """ - ptm = ParallelTaskManager_singleton() - ptm.RunParallel(commands, True, nProcesses) - + q_print_first_traceback_1 = False def Worker_RunMethod(Function, args_queue): while True: @@ -304,7 +296,7 @@ def _I_Spawn_Processes(message_to_spawner, message_to_PTM, cmds_queue): A process should be started as early as possible (while RAM usage is low) with this method as its target. This is now a separate process with low RAM usage. Each time some parallel work is required then the queue for that is placed in the message_queue by the PTM. - _I_Spawn_Processes - will spawn parallel processes when instructed by the message_queue in the message_queue and get them + _I_Spawn_Processes - will spawn parallel processes when instructed by the message_queue in the message_queue and get them working on the queue. When the queue is empty it will wait for the next one. It can receive a special signal to exit - the None object """ @@ -312,20 +304,20 @@ def _I_Spawn_Processes(message_to_spawner, message_to_PTM, cmds_queue): try: # peak in qoq - it is the only mehtod that tried to remove things from the queue message = message_to_spawner.get(timeout=.1) - if message == None: + if message == None: return # In which case, thread has been informed that there are tasks in the queue. nParallel, nTasks, qListOfLists = message if qListOfLists: - runningProcesses = [mp.Process(target=Worker_RunOrderedCommandList, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)] + runningProcesses = [mp.Process(target=Worker_RunOrderedCommandList, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)] else: - runningProcesses = [mp.Process(target=Worker_RunCommand, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)] + runningProcesses = [mp.Process(target=Worker_RunCommand, args = (cmds_queue, nParallel, nTasks)) for i_ in range(nParallel)] for proc in runningProcesses: proc.start() for proc in runningProcesses: while proc.is_alive(): try: - proc.join() + proc.join() except RuntimeError: pass message_to_PTM.put("Done") @@ -336,30 +328,38 @@ def _I_Spawn_Processes(message_to_spawner, message_to_PTM, cmds_queue): class ParallelTaskManager_singleton: + """ + Creating new process requires forking parent process and can lea to very high RAM usage. One way to mitigate this is + to create the pool of processes as early in execution as possible so that the memeory footprint is low. The + ParallelTaskManager takes care of that, and can be used by calling `RunParallelOrderedCommandLists` above. + Apr 2023 Update: + When running external programs there is no need to use multiprocessing, multithreading is sufficient since new process + will be created anyway, so the SIL is no longer an issue. + """ class __Singleton(object): def __init__(self): """Implementation: Allocate a thread that will perform all the tasks - Communicate with it using a queue. + Communicate with it using a queue. When provided with a list of commands it should fire up some workers and get them to run the commands and then exit. An alternative would be they should always stay alive - but then they could die for some reason? And I'd have to check how many there are. """ - self.message_to_spawner = mp.Queue() - self.message_to_PTM = mp.Queue() + self.message_to_spawner = mp.Queue() + self.message_to_PTM = mp.Queue() # Orders/Messages: # None (PTM -> spawn_thread) - thread should return (i.e. exit) # 'Done' (spawn_thread -> PTM) - the cmds from the cmd queue have completed - # Anything else = (nParallel, nTasks) (PTM -> spawn_thread) - cmds (nTasks of them) have been placed in the cmd queue, + # Anything else = (nParallel, nTasks) (PTM -> spawn_thread) - cmds (nTasks of them) have been placed in the cmd queue, # they should be executed using nParallel threads self.cmds_queue = mp.Queue() self.manager_process = mp.Process(target=_I_Spawn_Processes, args=(self.message_to_spawner, self.message_to_PTM, self.cmds_queue)) self.manager_process.start() instance = None - + def __init__(self): if not ParallelTaskManager_singleton.instance: ParallelTaskManager_singleton.instance = ParallelTaskManager_singleton.__Singleton() - + def RunParallel(self, cmd_list, qListOfLists, nParallel): """ Args: @@ -367,7 +367,7 @@ def RunParallel(self, cmd_list, qListOfLists, nParallel): qListOfLists - is cmd_lists a list of lists nParallel - number of parallel threads to use qShell - should the tasks be run in a shell - """ + """ nTasks = len(cmd_list) for i, x in enumerate(cmd_list): self.instance.cmds_queue.put((i, x)) @@ -375,12 +375,12 @@ def RunParallel(self, cmd_list, qListOfLists, nParallel): while True: try: signal = self.instance.message_to_PTM.get() - if signal == "Done": - return + if signal == "Done": + return except queue.Empty: pass time.sleep(1) - + def Stop(self): """Warning, cannot be restarted""" self.instance.message_to_spawner.put(None) @@ -389,7 +389,7 @@ def Stop(self): def Success(): ptm = ParallelTaskManager_singleton() - ptm.Stop() + ptm.Stop() sys.exit() def Fail(): diff --git a/scripts_of/program_caller.py b/scripts_of/program_caller.py index 7b64be08..80d2d48b 100644 --- a/scripts_of/program_caller.py +++ b/scripts_of/program_caller.py @@ -24,7 +24,7 @@ # # For any enquiries send an email to David Emms # david_emms@hotmail.comhor: david - +import concurrent.futures import os import sys import json @@ -357,7 +357,17 @@ def PrintDependencyCheckFailure(cmd): # ======================================================================================================================== -def RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_filenames, qListOfList): +def RunParallelCommands(nProcesses, commands, qListOfList, q_print_on_error=False, q_always_print_stderr=False): + if qListOfList: + commands_and_no_filenames = [[(cmd, None) for cmd in cmd_list] for cmd_list in commands] + else: + commands_and_no_filenames = [(cmd, None) for cmd in commands] + RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_no_filenames, qListOfList, q_print_on_error, + q_always_print_stderr) + + +def RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_filenames, qListOfList, q_print_on_error=False, + q_always_print_stderr=False): """ Calls the commands in parallel and if required moves the results file to the required new filename Args: @@ -370,18 +380,20 @@ def RunParallelCommandsAndMoveResultsFile(nProcesses, commands_and_filenames, qL qListOfList - if False then commands_and_filenames is a list of (cmd, actual_target_fn) tuples if True then commands_and_filenames is a list of lists of (cmd, actual_target_fn) tuples where the elements of the inner list need to be run in the order they appear. + q_print_on_error - If error code returend print stdout & stederr """ - # Setup the workers and run cmd_queue = mp.Queue() i = -1 for i, cmd in enumerate(commands_and_filenames): cmd_queue.put((i, cmd)) - runningProcesses = [mp.Process(target=parallel_task_manager.Worker_RunCommands_And_Move, args=(cmd_queue, nProcesses, i+1, qListOfList)) for _ in range(nProcesses)] - for proc in runningProcesses: - proc.start() - - for proc in runningProcesses: - while proc.is_alive(): - proc.join(10.) - time.sleep(2) - + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(parallel_task_manager.Worker_RunCommands_And_Move, + cmd_queue, + nProcesses, + i+1, + qListOfList, + q_print_on_error, + q_always_print_stderr=q_always_print_stderr) + for _ in range(nProcesses)] + concurrent.futures.wait(futures) diff --git a/scripts_of/trees2ologs_dlcpar.py b/scripts_of/trees2ologs_dlcpar.py index ab635399..dedf3ae2 100644 --- a/scripts_of/trees2ologs_dlcpar.py +++ b/scripts_of/trees2ologs_dlcpar.py @@ -41,7 +41,7 @@ from . import tree from . import util from . import files -from . import parallel_task_manager +from . import program_caller def natural_sort_key(s, _nsre=re.compile('([0-9]+)')): return [int(text) if text.isdigit() else text.lower() for text in re.split(_nsre, s)] @@ -133,7 +133,8 @@ def RunDlcpar(ogSet, speciesTreeFN, workingDir, nParallel, qDeepSearch): dlcCommands = ['dlcpar_search -s %s -S %s -D 1 -C 0.125 %s -I .txt -i %d --nprescreen 100 --nconverge %d' % (speciesTreeFN, geneMapFN, fn, i, n) for (fn, i, n) in zip(filenames, nIter, nNoImprov)] else: dlcCommands = ['dlcpar_search -s %s -S %s -D 1 -C 0.125 %s -I .txt -x 1' % (speciesTreeFN, geneMapFN, fn) for fn in filenames] - parallel_task_manager.RunParallelOrderedCommandLists(nParallel, [[c] for c in dlcCommands]) + program_caller.RunParallelCommands(nParallel, [[c] for c in dlcCommands], qListOfList=True) + return dlcparResultsDir, "OG%07d_tree_id.dlcpar.locus.tree" diff --git a/scripts_of/trees_msa.py b/scripts_of/trees_msa.py index 2f8c080c..437d3eea 100644 --- a/scripts_of/trees_msa.py +++ b/scripts_of/trees_msa.py @@ -257,7 +257,7 @@ def CreateConcatenatedAlignment(ogsToUse_ids, ogs, alignment_filename_function, outfile.write("".join(seq[i:i+nChar]) + "\n") def trim_fn(fn): - trim.main(fn, fn, 0.1, 500, 0.75) + trim.main(fn, fn, 0.1, 500, 0.75, False) """ ----------------------------------------------------------------------------- diff --git a/scripts_of/trim.py b/scripts_of/trim.py index 4320cd0b..c2c2d3bd 100755 --- a/scripts_of/trim.py +++ b/scripts_of/trim.py @@ -5,6 +5,7 @@ import shutil import argparse import itertools +import subprocess import numpy as np import scipy.sparse @@ -61,7 +62,18 @@ def write_msa(self, i_cols, outfn, nChar = 80): for i in range(0, len(seq), nChar): outfile.write(seq[i:i+nChar] + "\n") -def main(infn, outfn, f=0.1, n_min=500, c=0.75): +def main(infn, outfn, f=0.1, n_min=500, c=0.75, exe=False): + if exe: + run_exe(infn, outfn, f, n_min, c) + else: + run_in_process(infn, outfn, f, n_min, c) + +def run_exe(infn, outfn, f, n_min, c): + """run trim in a separate process not forked from current process""" + subprocess.run(["python", __file__, "-c", str(c), infn, outfn, str(f), str(n_min)]) + + +def run_in_process(infn, outfn, f, n_min, c): """ Trim the alignment file. Auto reduce f parameter if required. OrthoFinder default parameters are used by default. @@ -124,7 +136,6 @@ def main(infn, outfn, f=0.1, n_min=500, c=0.75): # write_msa(M, names, outfn) # print("%0.3f: %d->%d, %0.1f%% characters retained. Trimmed %s" % (f, length, i_keep[0].size, 100.*aa_after/aa_before, infn)) - def get_satifactory_f(gap_counts, aa_counts, N, f_orig, n_min, c, tol = 0.001): """ The f used was too large, get an f that gives n_min columns