Skip to content

[WIP] Add state/mappers to new workflow syntax #2648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
248d39a
Merge branch 'enh/workflow_syntax' of https://github.com/effigies/nip…
djarecka Jul 10, 2018
f568d78
Adding auxiliary and state from nipype2_tmp
djarecka Jul 12, 2018
682248a
updating exceptions and __init__; updating logger to the current vers…
djarecka Jul 13, 2018
7b3bba6
starting updating NewNode using mapper and state from nipype2_tmp; ch…
djarecka Jul 13, 2018
c944685
adding simple tests
djarecka Jul 14, 2018
4da8737
adding run_interface method for NewNode (still have to add run method)
djarecka Jul 15, 2018
7ff7722
copy submitter and workers from nipype2_tmp; adding SubmitterNode and…
djarecka Jul 15, 2018
502f617
fixing run method for node (previously it didn work for dask and cf):…
djarecka Jul 16, 2018
784ec22
adding reading results; changing node names (node name shouldn have _)
djarecka Jul 16, 2018
18a6176
adding dask to requirements
djarecka Jul 16, 2018
601c15b
adding distributed package
djarecka Jul 16, 2018
d5ceb24
skipping tests for py2; sorting results (not sure if the node should …
djarecka Jul 16, 2018
692bc1b
removing printing the bokeh address (doesnt work with travis)
djarecka Jul 17, 2018
ccf8926
adding tests for node mappers
djarecka Jul 17, 2018
8300a98
changing requirments to py>3.4 (fails with py3.4, not sure if we want…
djarecka Jul 17, 2018
46684a4
moving many attributes from NewNodeBase/NewNodeCore to NewNode, so I …
djarecka Jul 22, 2018
26496ae
starting NewWorkflow class that inherits from NewNode (for now only a…
djarecka Jul 24, 2018
40562a8
adding add and method methods to NewWorkflow
djarecka Jul 24, 2018
9fb54c3
fixing one test
djarecka Jul 24, 2018
39e6b50
adding add method to NewWOrkflow
djarecka Jul 24, 2018
e795961
adding comments to the tests
djarecka Jul 24, 2018
e6f445e
Merge remote-tracking branch 'upstream/enh/workflow_syntax' into effi…
effigies Jul 25, 2018
15b4ba8
[wip] rearranging classes - showing the problem with cf/dask when pas…
djarecka Jul 26, 2018
d0bdf1d
Revert "[wip] rearranging classes - showing the problem with cf/dask …
djarecka Jul 27, 2018
522f64c
rearranging classes, NewNode and NewWorkflow inherit from NewBase; fi…
djarecka Jul 30, 2018
2e02f69
allowing for kwarg arg in add method instead of connect method
djarecka Jul 31, 2018
730f371
allowing for using mapper from previous nodes, e.g. [_NA, b], addding…
djarecka Jul 31, 2018
1732b88
removing globals mapper2rpn
djarecka Aug 15, 2018
f1d603e
adding index_generator to the state class
djarecka Aug 15, 2018
25c501c
changing string representation of directories names and inputs names
djarecka Aug 29, 2018
1e4067c
adding comments
djarecka Aug 29, 2018
b93ef44
[wip] adding inner workflow if workflow has a mapper (doesnt work pro…
djarecka Aug 29, 2018
c7737d0
[wip] adding some asserts to the test with inner workflows [skip ci]
djarecka Aug 29, 2018
2214404
Adding comments
djarecka Aug 29, 2018
d0e8c76
fixing inner_workflows (had to add deepcopy to node), changing syntax…
djarecka Sep 4, 2018
dfe7bd5
[skip ci] fixing _check_all_results (bug introduced in f1d603e, but w…
djarecka Sep 4, 2018
409aa84
[skip ci] adding example from #2539 (not running yet); small changes …
djarecka Sep 5, 2018
1bc5350
small update to tests; adding DotDict to auxiliary (but dont use it r…
djarecka Sep 19, 2018
a2d33b9
adding tests for inner workflows
djarecka Sep 19, 2018
b87782c
small edits
djarecka Sep 24, 2018
fa8710b
removing the concept of inner workflow from workflow with mapper
djarecka Sep 24, 2018
2cec408
adding outputs_nm list for NewWorkflow, adding _reading_result for wo…
djarecka Sep 24, 2018
14ebc78
alloing for mapper in workflows (copying workflows in teh submitter);…
djarecka Sep 30, 2018
a9407b4
updating tests for mappers
djarecka Sep 30, 2018
63fe2f4
adding inputs and imports to newnode_nero
djarecka Oct 1, 2018
1c48416
introducing workflows that can be treated as a node: adding results, …
djarecka Oct 3, 2018
ef2b4cf
moving run node/wf to tests and removing run from Node/Workflow (at l…
djarecka Oct 4, 2018
935c8ea
merging all submitter classes
djarecka Oct 4, 2018
8c8aa29
adding comments; cleaning connect methods
djarecka Oct 4, 2018
970eb16
changing orders of the methods
djarecka Oct 4, 2018
89395f2
changing directory format for node without mapper (one layer less, si…
djarecka Oct 4, 2018
a0cdf31
wf has map the same as a node and map_node to change a mapper for the…
djarecka Oct 4, 2018
2f11ad6
adding fixture to change directory in the tests; changing nodedir to …
djarecka Oct 4, 2018
bf54454
adding wrapper to the current interfaces (with simple tests for bet);…
djarecka Oct 5, 2018
0becd3a
updating wf.add method to include nipype interfaces
djarecka Oct 6, 2018
d61519c
test_newnode_neuro works: fixing CurrentInterface and adding _reading…
djarecka Oct 8, 2018
2e3ea2a
[skip ci] small naming change
djarecka Oct 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adding outputs_nm list for NewWorkflow, adding _reading_result for wo…
…rkflow
  • Loading branch information
djarecka committed Sep 24, 2018
commit 2cec4085d0c8361010eecf709cd0e2012adf9f10
71 changes: 69 additions & 2 deletions nipype/pipeline/engine/tests/test_newnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,12 +845,79 @@ def test_workflow_11(plugin):
assert wf.nodes[2].result["out"][i][1] == res[1]


# checking workflow.result

@pytest.mark.parametrize("plugin", Plugins)
@python35_only
def test_workflow_12(plugin):
"""testing if wf.result works (the same workflow as in test_workflow_6)"""
wf = NewWorkflow(name="wf12", workingdir="test_wf12_{}".format(plugin),
outputs_nm=[("NA", "out", "NA_out"), ("NB", "out")])
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")
# using the map methods after add (using mapper for the last added nodes as default)
wf.add(na)
wf.map(mapper="a", inputs={"a": [3, 5]})
wf.add(nb)
wf.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]})
wf.connect("NA", "out", "NB", "a")
wf.run(plugin=plugin)

# checking if workflow.results is the same as results of nodes
assert wf.result["NA_out"] == wf.nodes[0].result["out"]
assert wf.result["out"] == wf.nodes[1].result["out"]

# checking values of workflow.result
expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)]
key_sort = list(expected[0][0].keys())
expected.sort(key=lambda t: [t[0][key] for key in key_sort])
wf.result["NA_out"].sort(key=lambda t: [t[0][key] for key in key_sort])
for i, res in enumerate(expected):
assert wf.result["NA_out"][i][0] == res[0]
assert wf.result["NA_out"][i][1] == res[1]

expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)]
key_sort = list(expected_B[0][0].keys())
expected_B.sort(key=lambda t: [t[0][key] for key in key_sort])
wf.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort])
for i, res in enumerate(expected_B):
assert wf.result["out"][i][0] == res[0]
assert wf.result["out"][i][1] == res[1]


@pytest.mark.parametrize("plugin", Plugins)
@python35_only
def test_workflow_12a(plugin):
"""testing if wf.result raises exceptione (the same workflow as in test_workflow_6)"""
wf = NewWorkflow(name="wf12a", workingdir="test_wf12a_{}".format(plugin),
outputs_nm=[("NA", "out", "wf_out"), ("NB", "out", "wf_out")])
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")
# using the map methods after add (using mapper for the last added nodes as default)
wf.add(na)
wf.map(mapper="a", inputs={"a": [3, 5]})
wf.add(nb)
wf.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]})
wf.connect("NA", "out", "NB", "a")
wf.run(plugin=plugin)

# wf_out can't be used twice in wf.result
with pytest.raises(Exception) as exinfo:
wf.result
assert str(exinfo.value) == "the key wf_out is already used in workflow.result"

# tests for a workflow that have its own input and mapper
# WIP
@pytest.mark.xfail(reason="WIP")
@pytest.mark.parametrize("plugin", Plugins)
@python35_only
def test_workflow_12(plugin):
def test_workflow_13(plugin):
"""using inputs for workflow and connect_workflow"""
wf = NewWorkflow(name="wf9", inputs={"wf_a": [3, 5]}, mapper="wf_a", workingdir="test_wf12_{}".format(plugin))
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
Expand All @@ -876,7 +943,7 @@ def test_workflow_12(plugin):
@pytest.mark.xfail(reason="WIP")
@pytest.mark.parametrize("plugin", Plugins)
@python35_only
def test_workflow_12a(plugin):
def test_workflow_13a(plugin):
"""using inputs for workflow and connect_workflow"""
wf = NewWorkflow(name="wf9", inputs={"wf_a": [3, 5]}, mapper="wf_a", workingdir="test_wf12a_{}".format(plugin))
interf_addvar = Function_Interface(fun_addvar, ["out"])
Expand Down
38 changes: 25 additions & 13 deletions nipype/pipeline/engine/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,13 @@ def inputs(self):
def state_inputs(self):
return self._state_inputs

@property
def result(self):
if not self._result:
self._reading_results()
return self._result


def prepare_state_input(self):
self._state.prepare_state_input(state_inputs=self.state_inputs)

Expand Down Expand Up @@ -1214,10 +1221,6 @@ def map(self, mapper, inputs=None):
def needed_outputs(self):
return self._needed_outputs

@property
def result(self):
return self.result


def run_interface_el(self, i, ind):
""" running interface one element generated from node_state."""
Expand Down Expand Up @@ -1288,14 +1291,6 @@ def _check_all_results(self):
return True


# reading results (without join for now)
@property
def result(self):
if not self._result:
self._reading_results()
return self._result


def _reading_results(self):
"""
reading results from file,
Expand Down Expand Up @@ -1326,7 +1321,7 @@ def run(self, plugin="serial"):


class NewWorkflow(NewBase):
def __init__(self, name, inputs=None, mapper=None, #join_by=None,
def __init__(self, name, inputs=None, outputs_nm=None, mapper=None, #join_by=None,
nodes=None, workingdir=None, mem_gb_node=None, *args, **kwargs):
super(NewWorkflow, self).__init__(name=name, mapper=mapper, inputs=inputs,
mem_gb_node=mem_gb_node, *args, **kwargs)
Expand All @@ -1347,6 +1342,9 @@ def __init__(self, name, inputs=None, mapper=None, #join_by=None,
if self.mapper:
pass #TODO

# list of (nodename, output name in the name, output name in wf) or (nodename, output name in the name)
self.outputs_nm = outputs_nm

# dj not sure what was the motivation, wf_klasses gives an empty list
#mro = self.__class__.mro()
#wf_klasses = mro[:mro.index(NewWorkflow)][::-1]
Expand Down Expand Up @@ -1503,6 +1501,20 @@ def join(self, field, node=None):
pass


def _reading_results(self):
for out in self.outputs_nm:
if len(out) == 2:
node_nm, out_nd_nm, out_wf_nm = out[0], out[1], out[1]
elif len(out) == 3:
node_nm, out_nd_nm, out_wf_nm = out
else:
raise Exception("outputs_nm should have 2 or 3 elements")
if out_wf_nm not in self._result.keys():
self._result[out_wf_nm] = self._node_names[node_nm].result[out_nd_nm]
else:
raise Exception("the key {} is already used in workflow.result".format(out_wf_nm))


def is_function(obj):
return hasattr(obj, '__call__')

Expand Down