Skip to content

SLURMGraph plugin #1136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 3, 2015
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Next release
============

* ENH: Added SLURMGraph plugin for submitting jobs to SLURM with dependencies (https://github.com/nipy/nipype/pull/1136)
* FIX: Enable absolute path definitions in DCMStack (https://github.com/nipy/nipype/pull/1089,
replaced by https://github.com/nipy/nipype/pull/1093)
* ENH: New mesh.MeshWarpMaths to operate on surface-defined warpings
Expand Down
38 changes: 30 additions & 8 deletions doc/users/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ particular node might use more resources than other nodes in a workflow.

SGEGraph
~~~~~~~~
SGEGraph_ is a exuction plugin working with Sun Grid Engine that allows for
SGEGraph_ is an execution plugin working with Sun Grid Engine that allows for
submitting entire graph of dependent jobs at once. This way Nipype does not
need to run a monitoring process - SGE takes care of this.
need to run a monitoring process - SGE takes care of this. The use of SGEGraph_
is preferred over SGE_ since the latter adds unnecessary load on the submit
machine.

.. note::

Expand Down Expand Up @@ -175,6 +177,26 @@ Optional arguments::

template: custom template file to use
sbatch_args: any other command line args to be passed to bsub.


SLURMGraph
~~~~~~~~~~
SLURMGraph_ is an execution plugin working with SLURM that allows for
submitting entire graph of dependent jobs at once. This way Nipype does not
need to run a monitoring process - SLURM takes care of this. The use of SLURMGraph_
plugin is preferred over the vanilla SLURM_ plugin since the latter adds
unnecessary load on the submit machine.


.. note::

When rerunning unfinished workflows using SLURMGraph you may decide not to
submit jobs for Nodes that previously finished running. This can speed up
execution, but new or modified inputs that would previously trigger a Node
to rerun will be ignored. The following option turns on this functionality::

workflow.run(plugin='SLURMGraph', plugin_args = {'dont_resubmit_completed_jobs': True})


HTCondor
--------
Expand All @@ -183,12 +205,12 @@ DAGMan
~~~~~~

With its DAGMan_ component HTCondor_ (previously Condor) allows for submitting
entire graphs of dependent jobs at once (similar to SGEGraph_). With the ``CondorDAGMan`` plug-in
Nipype can utilize this functionality to submit complete workflows directly and
in a single step. Consequently, and in contrast to other plug-ins, workflow
execution returns almost instantaneously -- Nipype is only used to generate the
workflow graph, while job scheduling and dependency resolution are entirely
managed by HTCondor_.
entire graphs of dependent jobs at once (similar to SGEGraph_ and SLURMGaaoh_).
With the ``CondorDAGMan`` plug-in Nipype can utilize this functionality to
submit complete workflows directly and in a single step. Consequently, and
in contrast to other plug-ins, workflow execution returns almost
instantaneously -- Nipype is only used to generate the workflow graph,
while job scheduling and dependency resolution are entirely managed by HTCondor_.

Please note that although DAGMan_ supports specification of data dependencies
as well as data provisioning on compute nodes this functionality is currently
Expand Down
1 change: 1 addition & 0 deletions nipype/pipeline/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
from .sgegraph import SGEGraphPlugin
from .lsf import LSFPlugin
from .slurm import SLURMPlugin
from .slurmgraph import SLURMGraphPlugin
10 changes: 1 addition & 9 deletions nipype/pipeline/plugins/sgegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,6 @@ def make_job_name(jobnumber, nodeslist):
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
node_status_done = node_completed_status(node)
## If a node has no dependencies, and it is requested to run_without_submitting
## then run this node in place
if (not node_status_done) and (len(dependencies[idx]) == 0 ) and (node.run_without_submitting == True):
try:
node.run()
except Exception:
node._clean_queue(idx, nodes)
node_status_done = True # if successfully run locally, then claim true

#if the node itself claims done, then check to ensure all
#dependancies are also done
Expand Down Expand Up @@ -130,7 +122,7 @@ def make_job_name(jobnumber, nodeslist):
values = ' '
for jobid in dependencies[idx]:
## Avoid dependancies of done jobs
if cache_doneness_per_node[jobid] == False:
if not self._dont_resubmit_completed_jobs or cache_doneness_per_node[jobid] == False:
values += "${{{0}}},".format(make_job_name(jobid, nodes))
if values != ' ': # i.e. if some jobs were added to dependency list
values = values.rstrip(',')
Expand Down
150 changes: 150 additions & 0 deletions nipype/pipeline/plugins/slurmgraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Parallel workflow execution via SLURM
"""

import os
import sys

from .base import (GraphPluginBase, logger)

from ...interfaces.base import CommandLine


def node_completed_status( checknode):
"""
A function to determine if a node has previously completed it's work
:param checknode: The node to check the run status
:return: boolean value True indicates that the node does not need to be run.
"""
""" TODO: place this in the base.py file and refactor """
node_state_does_not_require_overwrite = ( checknode.overwrite == False or
(checknode.overwrite == None and
not checknode._interface.always_run )
)
hash_exists = False
try:
hash_exists, _, _, _ = checknode.hash_exists()
except Exception:
hash_exists = False
return (hash_exists and node_state_does_not_require_overwrite )


class SLURMGraphPlugin(GraphPluginBase):
"""Execute using SLURM

The plugin_args input to run can be used to control the SGE execution.
Currently supported options are:

- template : template to use for batch job submission
- qsub_args : arguments to be prepended to the job execution script in the
qsub call

"""
_template="#!/bin/bash"

def __init__(self, **kwargs):
if 'plugin_args' in kwargs and kwargs['plugin_args']:
if 'retry_timeout' in kwargs['plugin_args']:
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
if 'max_tries' in kwargs['plugin_args']:
self._max_tries = kwargs['plugin_args']['max_tries']
if 'template' in kwargs['plugin_args']:
self._template = kwargs['plugin_args']['template']
if os.path.isfile(self._template):
self._template = open(self._template).read()
if 'sbatch_args' in kwargs['plugin_args']:
self._sbatch_args = kwargs['plugin_args']['sbatch_args']
if 'dont_resubmit_completed_jobs' in kwargs['plugin_args']:
self._dont_resubmit_completed_jobs = kwargs['plugin_args']['dont_resubmit_completed_jobs']
else:
self._dont_resubmit_completed_jobs = False
super(SLURMGraphPlugin, self).__init__(**kwargs)

def _submit_graph(self, pyfiles, dependencies, nodes):
def make_job_name(jobnumber, nodeslist):
"""
- jobnumber: The index number of the job to create
- nodeslist: The name of the node being processed
- return: A string representing this job to be displayed by SLURM
"""
job_name='j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
# Condition job_name to be a valid bash identifier (i.e. - is invalid)
job_name=job_name.replace('-','_').replace('.','_').replace(':','_')
return job_name
batch_dir, _ = os.path.split(pyfiles[0])
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')

cache_doneness_per_node = dict()
if self._dont_resubmit_completed_jobs: ## A future parameter for controlling this behavior could be added here
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
node_status_done = node_completed_status(node)

#if the node itself claims done, then check to ensure all
#dependancies are also done
if node_status_done and idx in dependencies:
for child_idx in dependencies[idx]:
if child_idx in cache_doneness_per_node:
child_status_done = cache_doneness_per_node[child_idx]
else:
child_status_done = node_completed_status(nodes[child_idx])
node_status_done = node_status_done and child_status_done

cache_doneness_per_node[idx] = node_status_done

with open(submitjobsfile, 'wt') as fp:
fp.writelines('#!/usr/bin/env bash\n')
fp.writelines('# Condense format attempted\n')
for idx, pyscript in enumerate(pyfiles):
node = nodes[idx]
if cache_doneness_per_node.get(idx,False):
continue
else:
template, sbatch_args = self._get_args(
node, ["template", "sbatch_args"])

batch_dir, name = os.path.split(pyscript)
name = '.'.join(name.split('.')[:-1])
batchscript = '\n'.join((template,
'%s %s' % (sys.executable, pyscript)))
batchscriptfile = os.path.join(batch_dir,
'batchscript_%s.sh' % name)

batchscriptoutfile = batchscriptfile + '.o'
batchscripterrfile = batchscriptfile + '.e'

with open(batchscriptfile, 'wt') as batchfp:
batchfp.writelines(batchscript)
batchfp.close()
deps = ''
if idx in dependencies:
values = ''
for jobid in dependencies[idx]:
## Avoid dependancies of done jobs
if not self._dont_resubmit_completed_jobs or cache_doneness_per_node[jobid] == False:
values += "${{{0}}}:".format(make_job_name(jobid, nodes))
if values != '': # i.e. if some jobs were added to dependency list
values = values.rstrip(':')
deps = '--dependency=afterok:%s' % values
jobname = make_job_name(idx, nodes)
# Do not use default output locations if they are set in self._sbatch_args
stderrFile = ''
if self._sbatch_args.count('-e ') == 0:
stderrFile = '-e {errFile}'.format(
errFile=batchscripterrfile)
stdoutFile = ''
if self._sbatch_args.count('-o ') == 0:
stdoutFile = '-o {outFile}'.format(
outFile=batchscriptoutfile)
full_line = '{jobNm}=$(sbatch {outFileOption} {errFileOption} {extraSBatchArgs} {dependantIndex} -J {jobNm} {batchscript} | awk \'{{print $4}}\')\n'.format(
jobNm=jobname,
outFileOption=stdoutFile,
errFileOption=stderrFile,
extraSBatchArgs=sbatch_args,
dependantIndex=deps,
batchscript=batchscriptfile)
fp.writelines(full_line)
cmd = CommandLine('bash', environ=os.environ.data,
terminal_output='allatonce')
cmd.inputs.args = '%s' % submitjobsfile
cmd.run()
logger.info('submitted all jobs to queue')