Skip to content

FIX MultiProc deadlock #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ test:
- docker run -v /etc/localtime:/etc/localtime:ro -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_dartel Linear /root/examples/ l2pipeline :
timeout: 1600
- docker run -v /etc/localtime:/etc/localtime:ro -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_fsl_reuse Linear /root/examples/ level1_workflow
# Disabled until https://github.com/nipy/nipype/issues/1692 is resolved
# - docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ l2pipeline

Expand Down
2 changes: 0 additions & 2 deletions nipype/interfaces/tests/test_runtime_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def _use_gb_ram(num_gb):


# Test case for the run function
@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
class TestRuntimeProfiler():
'''
This class is a test case for the runtime profiler
Expand Down
2 changes: 0 additions & 2 deletions nipype/pipeline/engine/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,6 @@ def func1(in1):
assert not error_raised


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_serial_input(tmpdir):
wd = str(tmpdir)
os.chdir(wd)
Expand Down
8 changes: 7 additions & 1 deletion nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,17 @@ def run(self, graph, config, updatehash=False):
self._remove_node_dirs()
report_nodes_not_run(notrun)


# close any open resources
self._close()

def _wait(self):
sleep(float(self._config['execution']['poll_sleep_duration']))

def _close(self):
# close any open resources, this could raise NotImplementedError
# but I didn't want to break other plugins
return True

def _get_result(self, taskid):
raise NotImplementedError

Expand Down
74 changes: 51 additions & 23 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Import packages
from multiprocessing import Process, Pool, cpu_count, pool
import threading
from traceback import format_exception
import sys

Expand All @@ -20,14 +21,13 @@
from ... import logging, config
from ...utils.misc import str2bool
from ..engine import MapNode
from ..plugins import semaphore_singleton
from .base import (DistributedPluginBase, report_crash)

# Init logger
logger = logging.getLogger('workflow')

# Run node
def run_node(node, updatehash):
def run_node(node, updatehash, taskid):
"""Function to execute node.run(), catch and log any errors and
return the result dictionary

Expand All @@ -45,7 +45,7 @@ def run_node(node, updatehash):
"""

# Init variables
result = dict(result=None, traceback=None)
result = dict(result=None, traceback=None, taskid=taskid)

# Try and execute the node via node.run()
try:
Expand Down Expand Up @@ -77,10 +77,6 @@ class NonDaemonPool(pool.Pool):
Process = NonDaemonProcess


def release_lock(args):
semaphore_singleton.semaphore.release()


# Get total system RAM
def get_system_total_memory_gb():
"""Function to get the total RAM of the running system in GB
Expand Down Expand Up @@ -136,12 +132,18 @@ def __init__(self, plugin_args=None):
# Init variables and instance attributes
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
self._taskresult = {}
self._task_obj = {}
self._taskid = 0
non_daemon = True
self.plugin_args = plugin_args
self.processors = cpu_count()
self.memory_gb = get_system_total_memory_gb()*0.9 # 90% of system memory

self._timeout=2.0
self._event = threading.Event()



# Check plugin args
if self.plugin_args:
if 'non_daemon' in self.plugin_args:
Expand All @@ -150,6 +152,9 @@ def __init__(self, plugin_args=None):
self.processors = self.plugin_args['n_procs']
if 'memory_gb' in self.plugin_args:
self.memory_gb = self.plugin_args['memory_gb']

logger.debug("MultiProcPlugin starting %d threads in pool"%(self.processors))

# Instantiate different thread pools for non-daemon processes
if non_daemon:
# run the execution using the non-daemon pool subclass
Expand All @@ -159,14 +164,23 @@ def __init__(self, plugin_args=None):

def _wait(self):
if len(self.pending_tasks) > 0:
semaphore_singleton.semaphore.acquire()
if self._config['execution']['poll_sleep_duration']:
self._timeout = float(self._config['execution']['poll_sleep_duration'])
sig_received=self._event.wait(self._timeout)
if not sig_received:
logger.debug('MultiProcPlugin timeout before signal received. Deadlock averted??')
self._event.clear()

def _async_callback(self, args):
self._taskresult[args['taskid']]=args
self._event.set()

def _get_result(self, taskid):
if taskid not in self._taskresult:
raise RuntimeError('Multiproc task %d not found' % taskid)
if not self._taskresult[taskid].ready():
return None
return self._taskresult[taskid].get()
result=None
else:
result=self._taskresult[taskid]
return result

def _report_crash(self, node, result=None):
if result and result['traceback']:
Expand All @@ -178,36 +192,50 @@ def _report_crash(self, node, result=None):
return report_crash(node)

def _clear_task(self, taskid):
del self._taskresult[taskid]
del self._task_obj[taskid]

def _submit_job(self, node, updatehash=False):
self._taskid += 1
if hasattr(node.inputs, 'terminal_output'):
if node.inputs.terminal_output == 'stream':
node.inputs.terminal_output = 'allatonce'

self._taskresult[self._taskid] = \
self._task_obj[self._taskid] = \
self.pool.apply_async(run_node,
(node, updatehash),
callback=release_lock)
(node, updatehash, self._taskid),
callback=self._async_callback)
return self._taskid

def _close(self):
self.pool.close()
return True

def _send_procs_to_workers(self, updatehash=False, graph=None):
""" Sends jobs to workers when system resources are available.
Check memory (gb) and cores usage before running jobs.
"""
executing_now = []

# Check to see if a job is available
jobids = np.flatnonzero((self.proc_pending == True) & \
currently_running_jobids = np.flatnonzero((self.proc_pending == True) & \
(self.depidx.sum(axis=0) == 0).__array__())

# Check available system resources by summing all threads and memory used
busy_memory_gb = 0
busy_processors = 0
for jobid in jobids:
busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
busy_processors += self.procs[jobid]._interface.num_threads
for jobid in currently_running_jobids:
if self.procs[jobid]._interface.estimated_memory_gb <= self.memory_gb and \
self.procs[jobid]._interface.num_threads <= self.processors:

busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
busy_processors += self.procs[jobid]._interface.num_threads

else:
raise ValueError("Resources required by jobid %d (%f GB, %d threads)"
"exceed what is available on the system (%f GB, %d threads)"%(jobid,
self.procs[jobid].__interface.estimated_memory_gb,
self.procs[jobid].__interface.num_threads,
self.memory_gb,self.processors))

free_memory_gb = self.memory_gb - busy_memory_gb
free_processors = self.processors - busy_processors
Expand Down Expand Up @@ -271,8 +299,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
hash_exists, _, _, _ = self.procs[
jobid].hash_exists()
logger.debug('Hash exists %s' % str(hash_exists))
if (hash_exists and (self.procs[jobid].overwrite == False or \
(self.procs[jobid].overwrite == None and \
if (hash_exists and (self.procs[jobid].overwrite == False or
(self.procs[jobid].overwrite == None and
not self.procs[jobid]._interface.always_run))):
self._task_finished_cb(jobid)
self._remove_node_dirs()
Expand All @@ -299,7 +327,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self._remove_node_dirs()

else:
logger.debug('submitting %s' % str(jobid))
logger.debug('MultiProcPlugin submitting %s' % str(jobid))
tid = self._submit_job(deepcopy(self.procs[jobid]),
updatehash=updatehash)
if tid is None:
Expand Down
6 changes: 0 additions & 6 deletions nipype/pipeline/plugins/tests/test_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ def test_callback_exception(tmpdir):
assert so.statuses[0][1] == 'start'
assert so.statuses[1][1] == 'exception'


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_callback_multiproc_normal(tmpdir):
so = Status()
wf = pe.Workflow(name='test', base_dir=str(tmpdir))
Expand All @@ -83,9 +80,6 @@ def test_callback_multiproc_normal(tmpdir):
assert so.statuses[0][1] == 'start'
assert so.statuses[1][1] == 'end'


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_callback_multiproc_exception(tmpdir):
so = Status()
wf = pe.Workflow(name='test', base_dir=str(tmpdir))
Expand Down
10 changes: 0 additions & 10 deletions nipype/pipeline/plugins/tests/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ def _list_outputs(self):
outputs['output1'] = [1, self.inputs.input1]
return outputs


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_run_multiproc(tmpdir):
os.chdir(str(tmpdir))

Expand Down Expand Up @@ -118,9 +115,6 @@ def find_metrics(nodes, last_node):

return total_memory, total_threads


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_no_more_memory_than_specified():
LOG_FILENAME = 'callback.log'
my_logger = logging.getLogger('callback')
Expand Down Expand Up @@ -179,10 +173,6 @@ def test_no_more_memory_than_specified():

os.remove(LOG_FILENAME)


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
@pytest.mark.skipif(nib.runtime_profile == False, reason="runtime_profile=False")
def test_no_more_threads_than_specified():
LOG_FILENAME = 'callback.log'
my_logger = logging.getLogger('callback')
Expand Down
9 changes: 0 additions & 9 deletions nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ def dummyFunction(filename):

return total


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def run_multiproc_nondaemon_with_flag(nondaemon_flag):
'''
Start a pipe with two nodes using the resource multiproc plugin and
Expand Down Expand Up @@ -132,9 +129,6 @@ def run_multiproc_nondaemon_with_flag(nondaemon_flag):
rmtree(temp_dir)
return result


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_run_multiproc_nondaemon_false():
'''
This is the entry point for the test. Two times a pipe of several multiprocessing jobs gets
Expand All @@ -151,9 +145,6 @@ def test_run_multiproc_nondaemon_false():
shouldHaveFailed = True
assert shouldHaveFailed


@pytest.mark.skipif(sys.version_info < (3, 0),
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
def test_run_multiproc_nondaemon_true():
# with nondaemon_flag = True, the execution should succeed
result = run_multiproc_nondaemon_with_flag(True)
Expand Down