Skip to content

[WIP] Add state/mappers to new workflow syntax #2648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
248d39a
Merge branch 'enh/workflow_syntax' of https://github.com/effigies/nip…
djarecka Jul 10, 2018
f568d78
Adding auxiliary and state from nipype2_tmp
djarecka Jul 12, 2018
682248a
updating exceptions and __init__; updating logger to the current vers…
djarecka Jul 13, 2018
7b3bba6
starting updating NewNode using mapper and state from nipype2_tmp; ch…
djarecka Jul 13, 2018
c944685
adding simple tests
djarecka Jul 14, 2018
4da8737
adding run_interface method for NewNode (still have to add run method)
djarecka Jul 15, 2018
7ff7722
copy submitter and workers from nipype2_tmp; adding SubmitterNode and…
djarecka Jul 15, 2018
502f617
fixing run method for node (previously it didn work for dask and cf):…
djarecka Jul 16, 2018
784ec22
adding reading results; changing node names (node name shouldn have _)
djarecka Jul 16, 2018
18a6176
adding dask to requirements
djarecka Jul 16, 2018
601c15b
adding distributed package
djarecka Jul 16, 2018
d5ceb24
skipping tests for py2; sorting results (not sure if the node should …
djarecka Jul 16, 2018
692bc1b
removing printing the bokeh address (doesnt work with travis)
djarecka Jul 17, 2018
ccf8926
adding tests for node mappers
djarecka Jul 17, 2018
8300a98
changing requirments to py>3.4 (fails with py3.4, not sure if we want…
djarecka Jul 17, 2018
46684a4
moving many attributes from NewNodeBase/NewNodeCore to NewNode, so I …
djarecka Jul 22, 2018
26496ae
starting NewWorkflow class that inherits from NewNode (for now only a…
djarecka Jul 24, 2018
40562a8
adding add and method methods to NewWorkflow
djarecka Jul 24, 2018
9fb54c3
fixing one test
djarecka Jul 24, 2018
39e6b50
adding add method to NewWOrkflow
djarecka Jul 24, 2018
e795961
adding comments to the tests
djarecka Jul 24, 2018
e6f445e
Merge remote-tracking branch 'upstream/enh/workflow_syntax' into effi…
effigies Jul 25, 2018
15b4ba8
[wip] rearranging classes - showing the problem with cf/dask when pas…
djarecka Jul 26, 2018
d0bdf1d
Revert "[wip] rearranging classes - showing the problem with cf/dask …
djarecka Jul 27, 2018
522f64c
rearranging classes, NewNode and NewWorkflow inherit from NewBase; fi…
djarecka Jul 30, 2018
2e02f69
allowing for kwarg arg in add method instead of connect method
djarecka Jul 31, 2018
730f371
allowing for using mapper from previous nodes, e.g. [_NA, b], addding…
djarecka Jul 31, 2018
1732b88
removing globals mapper2rpn
djarecka Aug 15, 2018
f1d603e
adding index_generator to the state class
djarecka Aug 15, 2018
25c501c
changing string representation of directories names and inputs names
djarecka Aug 29, 2018
1e4067c
adding comments
djarecka Aug 29, 2018
b93ef44
[wip] adding inner workflow if workflow has a mapper (doesnt work pro…
djarecka Aug 29, 2018
c7737d0
[wip] adding some asserts to the test with inner workflows [skip ci]
djarecka Aug 29, 2018
2214404
Adding comments
djarecka Aug 29, 2018
d0e8c76
fixing inner_workflows (had to add deepcopy to node), changing syntax…
djarecka Sep 4, 2018
dfe7bd5
[skip ci] fixing _check_all_results (bug introduced in f1d603e, but w…
djarecka Sep 4, 2018
409aa84
[skip ci] adding example from #2539 (not running yet); small changes …
djarecka Sep 5, 2018
1bc5350
small update to tests; adding DotDict to auxiliary (but dont use it r…
djarecka Sep 19, 2018
a2d33b9
adding tests for inner workflows
djarecka Sep 19, 2018
b87782c
small edits
djarecka Sep 24, 2018
fa8710b
removing the concept of inner workflow from workflow with mapper
djarecka Sep 24, 2018
2cec408
adding outputs_nm list for NewWorkflow, adding _reading_result for wo…
djarecka Sep 24, 2018
14ebc78
alloing for mapper in workflows (copying workflows in teh submitter);…
djarecka Sep 30, 2018
a9407b4
updating tests for mappers
djarecka Sep 30, 2018
63fe2f4
adding inputs and imports to newnode_nero
djarecka Oct 1, 2018
1c48416
introducing workflows that can be treated as a node: adding results, …
djarecka Oct 3, 2018
ef2b4cf
moving run node/wf to tests and removing run from Node/Workflow (at l…
djarecka Oct 4, 2018
935c8ea
merging all submitter classes
djarecka Oct 4, 2018
8c8aa29
adding comments; cleaning connect methods
djarecka Oct 4, 2018
970eb16
changing orders of the methods
djarecka Oct 4, 2018
89395f2
changing directory format for node without mapper (one layer less, si…
djarecka Oct 4, 2018
a0cdf31
wf has map the same as a node and map_node to change a mapper for the…
djarecka Oct 4, 2018
2f11ad6
adding fixture to change directory in the tests; changing nodedir to …
djarecka Oct 4, 2018
bf54454
adding wrapper to the current interfaces (with simple tests for bet);…
djarecka Oct 5, 2018
0becd3a
updating wf.add method to include nipype interfaces
djarecka Oct 6, 2018
d61519c
test_newnode_neuro works: fixing CurrentInterface and adding _reading…
djarecka Oct 8, 2018
2e3ea2a
[skip ci] small naming change
djarecka Oct 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding comments
  • Loading branch information
djarecka committed Aug 29, 2018
commit 2214404f7e677b4763d6ac2ddbec70a96965230e
50 changes: 39 additions & 11 deletions nipype/pipeline/engine/tests/test_newnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def test_node_2():
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": 3})
assert nn.mapper is None
# adding NA to the name of the variable
assert nn.inputs == {"NA.a": 3}
assert nn.state._mapper is None

Expand Down Expand Up @@ -94,12 +95,13 @@ def test_node_6(plugin):
# testing if the node runs properly
nn.run(plugin=plugin)

# checking teh results
# checking the results
expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)]
# to be sure that there is the same order (not sure if node itself should keep the order)
key_sort = list(expected[0][0].keys())
expected.sort(key=lambda t: [t[0][key] for key in key_sort])
nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort])

for i, res in enumerate(expected):
assert nn.result["out"][i][0] == res[0]
assert nn.result["out"][i][1] == res[1]
Expand All @@ -111,6 +113,7 @@ def test_node_7(plugin):
"""Node with interface, inputs and scalar mapper, running interface"""
interf_addvar = Function_Interface(fun_addvar, ["out"])
nn = NewNode(name="NA", interface=interf_addvar, base_dir="test_nd7_{}".format(plugin))
# scalar mapper
nn.map(mapper=("a", "b"), inputs={"a": [3, 5], "b": [2, 1]})

assert nn.mapper == ("NA.a", "NA.b")
Expand All @@ -120,12 +123,13 @@ def test_node_7(plugin):
# testing if the node runs properly
nn.run(plugin=plugin)

# checking teh results
# checking the results
expected = [({"NA.a": 3, "NA.b": 2}, 5), ({"NA.a": 5, "NA.b": 1}, 6)]
# to be sure that there is the same order (not sure if node itself should keep the order)
key_sort = list(expected[0][0].keys())
expected.sort(key=lambda t: [t[0][key] for key in key_sort])
nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort])

for i, res in enumerate(expected):
assert nn.result["out"][i][0] == res[0]
assert nn.result["out"][i][1] == res[1]
Expand All @@ -137,6 +141,7 @@ def test_node_8(plugin):
"""Node with interface, inputs and vector mapper, running interface"""
interf_addvar = Function_Interface(fun_addvar, ["out"])
nn = NewNode(name="NA", interface=interf_addvar, base_dir="test_nd8_{}".format(plugin))
# [] for outer product
nn.map(mapper=["a", "b"], inputs={"a": [3, 5], "b": [2, 1]})

assert nn.mapper == ["NA.a", "NA.b"]
Expand All @@ -158,15 +163,17 @@ def test_node_8(plugin):
assert nn.result["out"][i][1] == res[1]


# tests for workflows that set mapper to node that are later added to a workflow
# tests for workflows

@python35_only
def test_workflow_0(plugin="serial"):
"""workflow (without run) with one node with a mapper"""
wf = NewWorkflow(name="wf0", workingdir="test_wf0_{}".format(plugin))
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
# defining a node with mapper and inputs first
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")
na.map(mapper="a", inputs={"a": [3, 5]})
# one of the way of adding nodes to the workflow
wf.add_nodes([na])
assert wf.nodes[0].mapper == "NA.a"
assert (wf.nodes[0].inputs['NA.a'] == np.array([3, 5])).all()
Expand Down Expand Up @@ -202,9 +209,11 @@ def test_workflow_2(plugin):
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")
na.map(mapper="a", inputs={"a": [3, 5]})

# the second node does not have explicit mapper (but keeps the mapper from the NA node)
interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, inputs={"b": 10}, base_dir="nb")

# adding 2 nodes and create a connection (as it is now)
wf.add_nodes([na, nb])
wf.connect(na, "out", nb, "a")

Expand All @@ -219,7 +228,8 @@ def test_workflow_2(plugin):
assert wf.nodes[0].result["out"][i][0] == res[0]
assert wf.nodes[0].result["out"][i][1] == res[1]


# results from NB keeps the "state input" from the first node
# two elements as in NA
expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)]
key_sort = list(expected_B[0][0].keys())
expected_B.sort(key=lambda t: [t[0][key] for key in key_sort])
Expand All @@ -240,6 +250,7 @@ def test_workflow_2a(plugin):

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")
# explicit scalar mapper between "a" from NA and b
nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]})

wf.add_nodes([na, nb])
Expand All @@ -257,6 +268,7 @@ def test_workflow_2a(plugin):
assert wf.nodes[0].result["out"][i][0] == res[0]
assert wf.nodes[0].result["out"][i][1] == res[1]

# two elements (scalar mapper)
expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)]
key_sort = list(expected_B[0][0].keys())
expected_B.sort(key=lambda t: [t[0][key] for key in key_sort])
Expand All @@ -277,6 +289,7 @@ def test_workflow_2b(plugin):

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")
# outer mapper
nb.map(mapper=["NA.a", "b"], inputs={"b": [2, 1]})

wf.add_nodes([na, nb])
Expand All @@ -294,7 +307,7 @@ def test_workflow_2b(plugin):
assert wf.nodes[0].result["out"][i][0] == res[0]
assert wf.nodes[0].result["out"][i][1] == res[1]


# four elements (outer product)
expected_B = [({"NA.a": 3, "NB.b": 1}, 6), ({"NA.a": 3, "NB.b": 2}, 7),
({"NA.a": 5, "NB.b": 1}, 8), ({"NA.a": 5, "NB.b": 2}, 9)]
key_sort = list(expected_B[0][0].keys())
Expand All @@ -315,7 +328,7 @@ def test_workflow_3(plugin):
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")
na.map(mapper="a", inputs={"a": [3, 5]})

# using add method (as in the Satra's example) with a node
wf.add(na)

assert wf.nodes[0].mapper == "NA.a"
Expand All @@ -337,6 +350,7 @@ def test_workflow_3a(plugin):
wf = NewWorkflow(name="wf3a", workingdir="test_wf3a_{}".format(plugin))
interf_addtwo = Function_Interface(fun_addtwo, ["out"])

# using the add method with an interface
wf.add(interf_addtwo, base_dir="na", mapper="a", inputs={"a": [3, 5]}, name="NA")

assert wf.nodes[0].mapper == "NA.a"
Expand All @@ -356,7 +370,7 @@ def test_workflow_3a(plugin):
def test_workflow_3b(plugin):
"""using add (function) method"""
wf = NewWorkflow(name="wf3b", workingdir="test_wf3b_{}".format(plugin))

# using the add method with a function
wf.add(fun_addtwo, base_dir="na", mapper="a", inputs={"a": [3, 5]}, name="NA")

assert wf.nodes[0].mapper == "NA.a"
Expand Down Expand Up @@ -386,9 +400,11 @@ def test_workflow_4(plugin):

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")
# explicit mapper with a variable from the previous node
# providing inputs with b
nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]})
wf.add(nb)

# connect method as it is in the current version
wf.connect(na, "out", nb, "a")

wf.run(plugin=plugin)
Expand Down Expand Up @@ -422,7 +438,9 @@ def test_workflow_4a(plugin):

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")
# explicit mapper with a variable from the previous node
nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]})
# instead of "connect", using kwrg argument in the add method as in the example
wf.add(nb, a="NA.out")

wf.run(plugin=plugin)
Expand Down Expand Up @@ -456,6 +474,7 @@ def test_workflow_5(plugin):
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")

wf.add(na)
# using the map method after add (using mapper for the last added node as default)
wf.map(mapper="a", inputs={"a": [3, 5]})
wf.run(plugin=plugin)

Expand Down Expand Up @@ -498,7 +517,7 @@ def test_workflow_6(plugin):

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")

# using the map methods after add (using mapper for the last added nodes as default)
wf.add(na)
wf.map(mapper="a", inputs={"a": [3, 5]})
wf.add(nb)
Expand Down Expand Up @@ -533,7 +552,7 @@ def test_workflow_6a(plugin):

interf_addvar = Function_Interface(fun_addvar, ["out"])
nb = NewNode(name="NB", interface=interf_addvar, base_dir="nb")

# using the map method after add (specifying the node)
wf.add(na)
wf.add(nb)
wf.map(mapper="a", inputs={"a": [3, 5]}, node=na)
Expand Down Expand Up @@ -598,11 +617,13 @@ def test_workflow_6b(plugin):
@python35_only
def test_workflow_7(plugin):
"""using inputs for workflow and connect_workflow"""
# adding inputs to the workflow directly
wf = NewWorkflow(name="wf7", inputs={"wf_a": [3, 5]}, workingdir="test_wf7_{}".format(plugin))
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")

wf.add(na)
# connecting the node with inputs from the workflow
wf.connect_workflow(na, "wf_a","a")
wf.map(mapper="a")
wf.run(plugin=plugin)
Expand All @@ -620,11 +641,13 @@ def test_workflow_7(plugin):
@python35_only
def test_workflow_7a(plugin):
"""using inputs for workflow and connect(None...)"""
# adding inputs to the workflow directly
wf = NewWorkflow(name="wf7a", inputs={"wf_a": [3, 5]}, workingdir="test_wf7a_{}".format(plugin))
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")

wf.add(na)
# if connect has None as the first arg, it is the same as connect_workflow
wf.connect(None, "wf_a", na, "a")
wf.map(mapper="a")
wf.run(plugin=plugin)
Expand All @@ -645,7 +668,7 @@ def test_workflow_7b(plugin):
wf = NewWorkflow(name="wf7b", inputs={"wf_a": [3, 5]}, workingdir="test_wf7b_{}".format(plugin))
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
na = NewNode(name="NA", interface=interf_addtwo, base_dir="na")

# using kwrg argument in the add method (instead of connect or connect_workflow
wf.add(na, a="wf_a")
wf.map(mapper="a")
wf.run(plugin=plugin)
Expand Down Expand Up @@ -705,6 +728,7 @@ def test_workflow_9(plugin):
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
wf.add(name="NA", runnable=interf_addtwo, base_dir="na").map(mapper="a", inputs={"a": [3, 5]})
interf_addvar = Function_Interface(fun_addvar, ["out"])
# _NA means that I'm using mapper from the NA node, it's the same as ("NA.a", "b")
wf.add(name="NB", runnable=interf_addvar, base_dir="nb", a="NA.out").map(mapper=("_NA", "b"), inputs={"b": [2, 1]})
wf.run(plugin=plugin)

Expand Down Expand Up @@ -733,6 +757,7 @@ def test_workflow_10(plugin):
interf_addvar1 = Function_Interface(fun_addvar, ["out"])
wf.add(name="NA", runnable=interf_addvar1, base_dir="na").map(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]})
interf_addvar2 = Function_Interface(fun_addvar, ["out"])
# _NA means that I'm using mapper from the NA node, it's the same as (("NA.a", NA.b), "b")
wf.add(name="NB", runnable=interf_addvar2, base_dir="nb", a="NA.out").map(mapper=("_NA", "b"), inputs={"b": [2, 1]})
wf.run(plugin=plugin)

Expand Down Expand Up @@ -761,6 +786,7 @@ def test_workflow_10a(plugin):
interf_addvar1 = Function_Interface(fun_addvar, ["out"])
wf.add(name="NA", runnable=interf_addvar1, base_dir="na").map(mapper=["a", "b"], inputs={"a": [3, 5], "b": [0, 10]})
interf_addvar2 = Function_Interface(fun_addvar, ["out"])
# _NA means that I'm using mapper from the NA node, it's the same as (["NA.a", NA.b], "b")
wf.add(name="NB", runnable=interf_addvar2, base_dir="nb", a="NA.out").map(mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]})
wf.run(plugin=plugin)

Expand Down Expand Up @@ -793,6 +819,7 @@ def test_workflow_11(plugin):
interf_addtwo = Function_Interface(fun_addtwo, ["out"])
wf.add(name="NB", runnable=interf_addtwo, base_dir="nb").map(mapper="a", inputs={"a": [2, 1]})
interf_addvar2 = Function_Interface(fun_addvar, ["out"])
# _NA, _NB means that I'm using mappers from the NA/NB nodes, it's the same as [("NA.a", NA.b), "NB.a"]
wf.add(name="NC", runnable=interf_addvar2, base_dir="nc", a="NA.out", b="NB.out").map(mapper=["_NA", "_NB"])
wf.run(plugin=plugin)

Expand All @@ -816,6 +843,7 @@ def test_workflow_11(plugin):


# tests for a workflow that have its own input and mapper
#WIP

#@pytest.mark.parametrize("plugin", Plugins)
@python35_only
Expand Down