Skip to content

simplifying tests, removing some try/except blocks #1773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 6, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 34 additions & 72 deletions nipype/pipeline/engine/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _list_outputs(self):


def test_init():
with pytest.raises(Exception): pe.Workflow()
with pytest.raises(TypeError): pe.Workflow()
pipe = pe.Workflow(name='pipe')
assert type(pipe._graph) == nx.DiGraph

Expand Down Expand Up @@ -156,12 +156,8 @@ def test_expansion():
pipe5.add_nodes([pipe4])
pipe6 = pe.Workflow(name="pipe6")
pipe6.connect([(pipe5, pipe3, [('pipe4.mod5.output1', 'pipe2.mod3.input1')])])
error_raised = False
try:
pipe6._flatgraph = pipe6._create_flat_graph()
except:
error_raised = True
assert not error_raised

pipe6._flatgraph = pipe6._create_flat_graph()


def test_iterable_expansion():
Expand Down Expand Up @@ -330,11 +326,16 @@ def test_doubleconnect():
flow1 = pe.Workflow(name='test')
flow1.connect(a, 'a', b, 'a')
x = lambda: flow1.connect(a, 'b', b, 'a')
with pytest.raises(Exception): x()
with pytest.raises(Exception) as excinfo:
x()
assert "Trying to connect" in str(excinfo.value)

c = pe.Node(IdentityInterface(fields=['a', 'b']), name='c')
flow1 = pe.Workflow(name='test2')
x = lambda: flow1.connect([(a, c, [('b', 'b')]), (b, c, [('a', 'b')])])
with pytest.raises(Exception): x()
with pytest.raises(Exception) as excinfo:
x()
assert "Trying to connect" in str(excinfo.value)


'''
Expand Down Expand Up @@ -479,14 +480,10 @@ def func1(in1):
nested=False,
name='n1')
n2.inputs.in1 = [[1, [2]], 3, [4, 5]]
error_raised = False
try:

with pytest.raises(Exception) as excinfo:
n2.run()
except Exception as e:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(e))
error_raised = True
assert error_raised
assert "can only concatenate list" in str(excinfo.value)


def test_node_hash(tmpdir):
Expand Down Expand Up @@ -518,35 +515,31 @@ def func2(a):
w1.config['execution'] = {'stop_on_first_crash': 'true',
'local_hash_check': 'false',
'crashdump_dir': wd}
error_raised = False
# create dummy distributed plugin class
from nipype.pipeline.plugins.base import DistributedPluginBase

# create a custom exception
class EngineTestException(Exception):
pass

class RaiseError(DistributedPluginBase):
def _submit_job(self, node, updatehash=False):
raise Exception('Submit called')
try:
raise EngineTestException('Submit called')

# check if a proper exception is raised
with pytest.raises(EngineTestException) as excinfo:
w1.run(plugin=RaiseError())
except Exception as e:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(e))
error_raised = True
assert error_raised
assert 'Submit called' == str(excinfo.value)

# rerun to ensure we have outputs
w1.run(plugin='Linear')
# set local check
w1.config['execution'] = {'stop_on_first_crash': 'true',
'local_hash_check': 'true',
'crashdump_dir': wd}
error_raised = False
try:
w1.run(plugin=RaiseError())
except Exception as e:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(e))
error_raised = True
assert not error_raised

w1.run(plugin=RaiseError())


def test_old_config(tmpdir):
wd = str(tmpdir)
Expand Down Expand Up @@ -574,14 +567,8 @@ def func2(a):

w1.config['execution']['crashdump_dir'] = wd
# generate outputs
error_raised = False
try:
w1.run(plugin='Linear')
except Exception as e:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(e))
error_raised = True
assert not error_raised

w1.run(plugin='Linear')


def test_mapnode_json(tmpdir):
Expand Down Expand Up @@ -618,13 +605,9 @@ def func1(in1):
with open(os.path.join(node.output_dir(), 'test.json'), 'wt') as fp:
fp.write('dummy file')
w1.config['execution'].update(**{'stop_on_first_rerun': True})
error_raised = False
try:
w1.run()
except:
error_raised = True
assert not error_raised

w1.run()


def test_parameterize_dirs_false(tmpdir):
from ....interfaces.utility import IdentityInterface
Expand All @@ -643,14 +626,8 @@ def test_parameterize_dirs_false(tmpdir):
wf.config['execution']['parameterize_dirs'] = False
wf.connect([(n1, n2, [('output1', 'in1')])])

error_raised = False
try:
wf.run()
except TypeError as typerr:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(typerr))
error_raised = True
assert not error_raised

wf.run()


def test_serial_input(tmpdir):
Expand Down Expand Up @@ -680,30 +657,15 @@ def func1(in1):
assert n1.num_subnodes() == len(n1.inputs.in1)

# test running the workflow on default conditions
error_raised = False
try:
w1.run(plugin='MultiProc')
except Exception as e:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(e))
error_raised = True
assert not error_raised
w1.run(plugin='MultiProc')

# test output of num_subnodes method when serial is True
n1._serial = True
assert n1.num_subnodes() == 1

# test running the workflow on serial conditions
error_raised = False
try:
w1.run(plugin='MultiProc')
except Exception as e:
from nipype.pipeline.engine.base import logger
logger.info('Exception: %s' % str(e))
error_raised = True

assert not error_raised

w1.run(plugin='MultiProc')


def test_write_graph_runs(tmpdir):
os.chdir(str(tmpdir))
Expand Down