Skip to content

Commit

Permalink
Process: properly cleanup when exception in state transition (#5697)
Browse files Browse the repository at this point in the history
If a process would hit an exception during a state transition, the node
would not always be cleaned up properly. For example, when a process
registers invalid outputs, the process would except in `update_outputs`
which is called by `on_entered` which is called once the process has
entered a new state.

This exception would not be dealt with, skipping the code that is
responsible for updating the process state on the node. This would lead
to the node of the excepted node to still show the previous incorrect
state. Typically, users would see processes as `Running` in the output
of `verdi process list`. This is because that value is taken from the
`process_state` attribute, but this simply contained the incorrect
state.

The solution requires a fix in `plumpy`, which was released with
`v0.21.1` but it also requires an update in `update_node_state` of the
`Process` class to make sure the `set_process_state` call is made before
the `update_outputs` which is prone to excepting in the case of invalid
outputs having been registered by the process implementation.
  • Loading branch information
sphuber authored Nov 28, 2022
1 parent 11d8942 commit a7be795
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 12 deletions.
6 changes: 1 addition & 5 deletions aiida/engine/processes/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ def on_create(self) -> None:
self._pid = self._create_and_setup_db_record() # pylint: disable=attribute-defined-outside-init

@override
def on_entering(self, state: plumpy.process_states.State) -> None:
super().on_entering(state)
# Update the node attributes every time we enter a new state

def on_entered(self, from_state: Optional[plumpy.process_states.State]) -> None:
"""After entering a new state, save a checkpoint and update the latest process state change timestamp."""
# pylint: disable=cyclic-import
Expand Down Expand Up @@ -635,8 +631,8 @@ def decode_input_args(self, encoded: str) -> Dict[str, Any]: # pylint: disable=
return serialize.deserialize_unsafe(encoded)

def update_node_state(self, state: plumpy.process_states.State) -> None:
self.update_outputs()
self.node.set_process_state(state.LABEL)
self.update_outputs()

def update_outputs(self) -> None:
"""Attach new outputs to the node since the last call.
Expand Down
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies:
- importlib-metadata~=4.3
- numpy~=1.19
- paramiko>=2.7.2,~=2.7
- plumpy~=0.21.0
- plumpy~=0.21.1
- pgsu~=0.2.1
- psutil~=5.6
- psycopg2-binary~=2.8
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dependencies = [
"importlib-metadata~=4.3",
"numpy~=1.19",
"paramiko~=2.7,>=2.7.2",
"plumpy~=0.21.0",
"plumpy~=0.21.1",
"pgsu~=0.2.1",
"psutil~=5.6",
"psycopg2-binary~=2.8",
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-py-3.10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pickleshare==0.7.5
Pillow==9.3.0
plotly==5.4.0
pluggy==1.0.0
plumpy==0.21.0
plumpy==0.21.1
prometheus-client==0.12.0
prompt-toolkit==3.0.23
psutil==5.8.0
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-py-3.8.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pickleshare==0.7.5
Pillow==9.3.0
plotly==5.4.0
pluggy==1.0.0
plumpy==0.21.0
plumpy==0.21.1
prometheus-client==0.12.0
prompt-toolkit==3.0.23
psutil==5.8.0
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements-py-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pickleshare==0.7.5
Pillow==9.3.0
plotly==5.4.0
pluggy==1.0.0
plumpy==0.21.0
plumpy==0.21.1
prometheus-client==0.12.0
prompt-toolkit==3.0.23
psutil==5.8.0
Expand Down
18 changes: 18 additions & 0 deletions tests/engine/test_process_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
complain as the dummy node class is not recognized as a valid process node.
"""
import enum
import re

import pytest

Expand Down Expand Up @@ -567,3 +568,20 @@ def function_with_multiple_defaults(integer: int = 10, string: str = 'default',
def test_input_serialization_none_default():
"""Test that calling a function with explicit ``None`` for an argument that defines ``None`` as default works."""
assert function_with_none_default(int_a=1, int_b=2, int_c=None).value == 3


def test_invalid_outputs():
"""Test that returning an invalid output will properly lead the node to go in the excepted state."""

@calcfunction
def excepting():
node = orm.Int(2)
return {'a': node, 'b': node}

with pytest.raises(ValueError):
excepting()

# Since the calcfunction actually raises an exception, it cannot return the node, so we have to query for it
node = orm.QueryBuilder().append(orm.ProcessNode, tag='node').order_by({'node': {'id': 'desc'}}).first(flat=True)
assert node.is_excepted, node.process_state
assert re.match(r'ValueError: node<.*> already has an incoming LinkType.CREATE link', node.exception)
13 changes: 11 additions & 2 deletions tests/engine/test_work_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ def define(cls, spec):
def test_out_unstored(self):
"""Calling `self.out` on an unstored `Node` should raise.
It indicates that users created new data whose provenance will be lost.
It indicates that users created new data whose provenance will be lost. The node should be properly marked as
excepted.
"""

class IllegalWorkChain(WorkChain):
Expand All @@ -386,7 +387,15 @@ def illegal(self):
self.out('not_allowed', orm.Int(2))

with pytest.raises(ValueError):
launch.run(IllegalWorkChain)
_, node = launch.run_get_node(IllegalWorkChain)

node = orm.QueryBuilder().append(orm.ProcessNode, tag='node').order_by({
'node': {
'id': 'desc'
}
}).first(flat=True)
assert node.is_excepted
assert 'ValueError: Workflow<IllegalWorkChain> tried returning an unstored `Data` node.' in node.exception

def test_same_input_node(self):

Expand Down

0 comments on commit a7be795

Please sign in to comment.