Skip to content

Commit 766aa17

Browse files
committed
RF: Restory test_legacymultiproc_nondaemon
1 parent 727ec0e commit 766aa17

File tree

1 file changed

+166
-0
lines changed

1 file changed

+166
-0
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# -*- coding: utf-8 -*-
2+
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
3+
# vi: set ft=python sts=4 ts=4 sw=4 et:
4+
"""Testing module for functions and classes from multiproc.py
5+
"""
6+
from __future__ import (print_function, division, unicode_literals,
7+
absolute_import)
8+
from builtins import range, open
9+
10+
# Import packages
11+
import os
12+
import sys
13+
from tempfile import mkdtemp
14+
from shutil import rmtree
15+
import pytest
16+
17+
import nipype.pipeline.engine as pe
18+
from nipype.interfaces.utility import Function
19+
20+
21+
def mytestFunction(insum=0):
22+
'''
23+
Run a multiprocessing job and spawn child processes.
24+
'''
25+
26+
# need to import here since this is executed as an external process
27+
import multiprocessing
28+
import os
29+
import tempfile
30+
import time
31+
32+
numberOfThreads = 2
33+
34+
# list of processes
35+
t = [None] * numberOfThreads
36+
37+
# list of alive flags
38+
a = [None] * numberOfThreads
39+
40+
# list of tempFiles
41+
f = [None] * numberOfThreads
42+
43+
def dummyFunction(filename):
44+
'''
45+
This function writes the value 45 to the given filename.
46+
'''
47+
j = 0
48+
for i in range(0, 10):
49+
j += i
50+
51+
# j is now 45 (0+1+2+3+4+5+6+7+8+9)
52+
53+
with open(filename, 'w') as f:
54+
f.write(str(j))
55+
56+
for n in range(numberOfThreads):
57+
58+
# mark thread as alive
59+
a[n] = True
60+
61+
# create a temp file to use as the data exchange container
62+
tmpFile = tempfile.mkstemp('.txt', 'test_engine_')[1]
63+
f[n] = tmpFile # keep track of the temp file
64+
t[n] = multiprocessing.Process(target=dummyFunction, args=(tmpFile, ))
65+
# fire up the job
66+
t[n].start()
67+
68+
# block until all processes are done
69+
allDone = False
70+
while not allDone:
71+
72+
time.sleep(1)
73+
74+
for n in range(numberOfThreads):
75+
76+
a[n] = t[n].is_alive()
77+
78+
if not any(a):
79+
# if no thread is alive
80+
allDone = True
81+
82+
# here, all processes are done
83+
84+
# read in all temp files and sum them up
85+
total = insum
86+
for ff in f:
87+
with open(ff) as fd:
88+
total += int(fd.read())
89+
os.remove(ff)
90+
91+
return total
92+
93+
94+
def run_multiproc_nondaemon_with_flag(nondaemon_flag):
95+
'''
96+
Start a pipe with two nodes using the resource multiproc plugin and
97+
passing the nondaemon_flag.
98+
'''
99+
100+
cur_dir = os.getcwd()
101+
temp_dir = mkdtemp(prefix='test_engine_')
102+
os.chdir(temp_dir)
103+
104+
pipe = pe.Workflow(name='pipe')
105+
106+
f1 = pe.Node(
107+
interface=Function(
108+
function=mytestFunction,
109+
input_names=['insum'],
110+
output_names=['sum_out']),
111+
name='f1')
112+
f2 = pe.Node(
113+
interface=Function(
114+
function=mytestFunction,
115+
input_names=['insum'],
116+
output_names=['sum_out']),
117+
name='f2')
118+
119+
pipe.connect([(f1, f2, [('sum_out', 'insum')])])
120+
pipe.base_dir = os.getcwd()
121+
f1.inputs.insum = 0
122+
123+
pipe.config['execution']['stop_on_first_crash'] = True
124+
125+
# execute the pipe using the MultiProc plugin with 2 processes and the
126+
# non_daemon flag to enable child processes which start other
127+
# multiprocessing jobs
128+
execgraph = pipe.run(
129+
plugin="MultiProc",
130+
plugin_args={
131+
'n_procs': 2,
132+
'non_daemon': nondaemon_flag
133+
})
134+
135+
names = [
136+
'.'.join((node._hierarchy, node.name)) for node in execgraph.nodes()
137+
]
138+
node = list(execgraph.nodes())[names.index('pipe.f2')]
139+
result = node.get_output('sum_out')
140+
os.chdir(cur_dir)
141+
rmtree(temp_dir)
142+
return result
143+
144+
145+
def test_run_multiproc_nondaemon_false():
146+
'''
147+
This is the entry point for the test. Two times a pipe of several
148+
multiprocessing jobs gets executed. First, without the nondaemon flag.
149+
Second, with the nondaemon flag.
150+
151+
Since the processes of the pipe start child processes, the execution only
152+
succeeds when the non_daemon flag is on.
153+
'''
154+
shouldHaveFailed = False
155+
try:
156+
# with nondaemon_flag = False, the execution should fail
157+
run_multiproc_nondaemon_with_flag(False)
158+
except:
159+
shouldHaveFailed = True
160+
assert shouldHaveFailed
161+
162+
163+
def test_run_multiproc_nondaemon_true():
164+
# with nondaemon_flag = True, the execution should succeed
165+
result = run_multiproc_nondaemon_with_flag(True)
166+
assert result == 180 # n_procs (2) * numberOfThreads (2) * 45 == 180

0 commit comments

Comments
 (0)