diff --git a/RELEASE.md b/RELEASE.md index d50e1b936d..d3b6e7f061 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -7,6 +7,7 @@ * Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`. * Dropped the dependency on `toposort` in favour of the built-in `graphlib` module. * Improve the performance of `Pipeline` object creation and summing. +* Improve suggestions to resume failed pipeline runs. ## Bug fixes and other changes * Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings. @@ -18,6 +19,9 @@ ## Documentation changes ## Community contributions +Many thanks to the following Kedroids for contributing PRs to this release: + +* [ondrejzacha](https://github.com/ondrejzacha) # Release 0.19.3 diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index ae653f37ea..f9cdd08798 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -15,7 +15,7 @@ as_completed, wait, ) -from typing import Any, Iterable, Iterator +from typing import Any, Collection, Iterable, Iterator from more_itertools import interleave from pluggy import PluginManager @@ -198,17 +198,13 @@ def _suggest_resume_scenario( postfix = "" if done_nodes: - node_names = (n.name for n in remaining_nodes) - resume_p = pipeline.only_nodes(*node_names) - start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs()) - - # find the nearest persistent ancestors of the nodes in start_p - start_p_persistent_ancestors = _find_persistent_ancestors( - pipeline, start_p.nodes, catalog + start_node_names = _find_nodes_to_resume_from( + pipeline=pipeline, + unfinished_nodes=remaining_nodes, + catalog=catalog, ) - - start_node_names = (n.name for n in start_p_persistent_ancestors) - postfix += f" --from-nodes \"{','.join(start_node_names)}\"" + start_nodes_str = ",".join(sorted(start_node_names)) + postfix += f' --from-nodes "{start_nodes_str}"' if not postfix: self._logger.warning( @@ -216,80 +212,166 @@ def _suggest_resume_scenario( ) else: self._logger.warning( - "There are %d nodes that have not run.\n" + f"There are {len(remaining_nodes)} nodes that have not run.\n" "You can resume the pipeline run from the nearest nodes with " "persisted inputs by adding the following " - "argument to your previous command:\n%s", - len(remaining_nodes), - postfix, + f"argument to your previous command:\n{postfix}" ) -def _find_persistent_ancestors( - pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog +def _find_nodes_to_resume_from( + pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog +) -> set[str]: + """Given a collection of unfinished nodes in a pipeline using + a certain catalog, find the node names to pass to pipeline.from_nodes() + to cover all unfinished nodes, including any additional nodes + that should be re-run if their outputs are not persisted. + + Args: + pipeline: the ``Pipeline`` to find starting nodes for. + unfinished_nodes: collection of ``Node``s that have not finished yet + catalog: the ``DataCatalog`` of the run. + + Returns: + Set of node names to pass to pipeline.from_nodes() to continue + the run. + + """ + nodes_to_be_run = _find_all_nodes_for_resumed_pipeline( + pipeline, unfinished_nodes, catalog + ) + + # Find which of the remaining nodes would need to run first (in topo sort) + persistent_ancestors = _find_initial_node_group(pipeline, nodes_to_be_run) + + return {n.name for n in persistent_ancestors} + + +def _find_all_nodes_for_resumed_pipeline( + pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog ) -> set[Node]: """Breadth-first search approach to finding the complete set of - persistent ancestors of an iterable of ``Node``s. Persistent - ancestors exclusively have persisted ``Dataset``s as inputs. + ``Node``s which need to run to cover all unfinished nodes, + including any additional nodes that should be re-run if their outputs + are not persisted. Args: - pipeline: the ``Pipeline`` to find ancestors in. - children: the iterable containing ``Node``s to find ancestors of. + pipeline: the ``Pipeline`` to analyze. + unfinished_nodes: the iterable of ``Node``s which have not finished yet. catalog: the ``DataCatalog`` of the run. Returns: - A set containing first persistent ancestors of the given - ``Node``s. + A set containing all input unfinished ``Node``s and all remaining + ``Node``s that need to run in case their outputs are not persisted. """ - ancestor_nodes_to_run = set() - queue, visited = deque(children), set(children) + nodes_to_run = set(unfinished_nodes) + initial_nodes = _nodes_with_external_inputs(unfinished_nodes) + + queue, visited = deque(initial_nodes), set(initial_nodes) while queue: current_node = queue.popleft() - if _has_persistent_inputs(current_node, catalog): - ancestor_nodes_to_run.add(current_node) - continue - for parent in _enumerate_parents(pipeline, current_node): - if parent in visited: + nodes_to_run.add(current_node) + # Look for parent nodes which produce non-persistent inputs (if those exist) + non_persistent_inputs = _enumerate_non_persistent_inputs(current_node, catalog) + for node in _enumerate_nodes_with_outputs(pipeline, non_persistent_inputs): + if node in visited: continue - visited.add(parent) - queue.append(parent) - return ancestor_nodes_to_run + visited.add(node) + queue.append(node) + # Make sure no downstream tasks are skipped + nodes_to_run = set(pipeline.from_nodes(*(n.name for n in nodes_to_run)).nodes) -def _enumerate_parents(pipeline: Pipeline, child: Node) -> list[Node]: - """For a given ``Node``, returns a list containing the direct parents - of that ``Node`` in the given ``Pipeline``. + return nodes_to_run + + +def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]: + """For given ``Node``s , find their subset which depends on + external inputs of the ``Pipeline`` they constitute. External inputs + are pipeline inputs not produced by other ``Node``s in the ``Pipeline``. Args: - pipeline: the ``Pipeline`` to search for direct parents in. - child: the ``Node`` to find parents of. + nodes_of_interest: the ``Node``s to analyze. Returns: - A list of all ``Node``s that are direct parents of ``child``. + A set of ``Node``s that depend on external inputs + of nodes of interest. """ - parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) - return parent_pipeline.nodes + p_nodes_of_interest = Pipeline(nodes_of_interest) + p_nodes_with_external_inputs = p_nodes_of_interest.only_nodes_with_inputs( + *p_nodes_of_interest.inputs() + ) + return set(p_nodes_with_external_inputs.nodes) -def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool: - """Check if a ``Node`` exclusively has persisted Datasets as inputs. - If at least one input is a ``MemoryDataset``, return False. +def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[str]: + """Enumerate non-persistent input datasets of a ``Node``. Args: node: the ``Node`` to check the inputs of. catalog: the ``DataCatalog`` of the run. Returns: - True if the ``Node`` being checked exclusively has inputs that - are not ``MemoryDataset``, else False. + Set of names of non-persistent inputs of given ``Node``. """ + # We use _datasets because they pertain parameter name format + catalog_datasets = catalog._datasets + non_persistent_inputs: set[str] = set() for node_input in node.inputs: - if isinstance(catalog._datasets[node_input], MemoryDataset): - return False - return True + if node_input.startswith("params:"): + continue + + if ( + node_input not in catalog_datasets + or catalog_datasets[node_input]._EPHEMERAL + ): + non_persistent_inputs.add(node_input) + + return non_persistent_inputs + + +def _enumerate_nodes_with_outputs( + pipeline: Pipeline, outputs: Collection[str] +) -> list[Node]: + """For given outputs, returns a list containing nodes that + generate them in the given ``Pipeline``. + + Args: + pipeline: the ``Pipeline`` to search for nodes in. + outputs: the dataset names to find source nodes for. + + Returns: + A list of all ``Node``s that are producing ``outputs``. + + """ + parent_pipeline = pipeline.only_nodes_with_outputs(*outputs) + return parent_pipeline.nodes + + +def _find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[Node]: + """Given a collection of ``Node``s in a ``Pipeline``, + find the initial group of ``Node``s to be run (in topological order). + + This can be used to define a sub-pipeline with the smallest possible + set of nodes to pass to --from-nodes. + + Args: + pipeline: the ``Pipeline`` to search for initial ``Node``s in. + nodes: the ``Node``s to find initial group for. + + Returns: + A list of initial ``Node``s to run given inputs (in topological order). + + """ + node_names = set(n.name for n in nodes) + if len(node_names) == 0: + return [] + sub_pipeline = pipeline.only_nodes(*node_names) + initial_nodes = sub_pipeline.grouped_nodes[0] + return initial_nodes def run_node( diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 25ca233e97..629000686f 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -15,6 +15,10 @@ def identity(arg): return arg +def first_arg(*args): + return args[0] + + def sink(arg): pass @@ -36,7 +40,7 @@ def return_not_serialisable(arg): return lambda x: x -def multi_input_list_output(arg1, arg2): +def multi_input_list_output(arg1, arg2, arg3=None): return [arg1, arg2] @@ -80,6 +84,9 @@ def _save(arg): "ds0_B": persistent_dataset, "ds2_A": persistent_dataset, "ds2_B": persistent_dataset, + "dsX": persistent_dataset, + "dsY": persistent_dataset, + "params:p": MemoryDataset(1), } ) @@ -148,21 +155,31 @@ def unfinished_outputs_pipeline(): @pytest.fixture def two_branches_crossed_pipeline(): - """A ``Pipeline`` with an X-shape (two branches with one common node)""" + r"""A ``Pipeline`` with an X-shape (two branches with one common node): + + (node1_A) (node1_B) + \ / + (node2) + / \ + (node3_A) (node3_B) + / \ + (node4_A) (node4_B) + + """ return pipeline( [ - node(identity, "ds0_A", "ds1_A", name="node1_A"), - node(identity, "ds0_B", "ds1_B", name="node1_B"), + node(first_arg, "ds0_A", "ds1_A", name="node1_A"), + node(first_arg, "ds0_B", "ds1_B", name="node1_B"), node( multi_input_list_output, ["ds1_A", "ds1_B"], ["ds2_A", "ds2_B"], name="node2", ), - node(identity, "ds2_A", "ds3_A", name="node3_A"), - node(identity, "ds2_B", "ds3_B", name="node3_B"), - node(identity, "ds3_A", "ds4_A", name="node4_A"), - node(identity, "ds3_B", "ds4_B", name="node4_B"), + node(first_arg, "ds2_A", "ds3_A", name="node3_A"), + node(first_arg, "ds2_B", "ds3_B", name="node3_B"), + node(first_arg, "ds3_A", "ds4_A", name="node4_A"), + node(first_arg, "ds3_B", "ds4_B", name="node4_B"), ] ) @@ -175,3 +192,103 @@ def pipeline_with_memory_datasets(): node(func=identity, inputs="Input2", outputs="MemOutput2", name="node2"), ] ) + + +@pytest.fixture +def pipeline_asymmetric(): + r""" + + (node1) + \ + (node3) (node2) + \ / + (node4) + + """ + return pipeline( + [ + node(first_arg, ["ds0_A"], ["_ds1"], name="node1"), + node(first_arg, ["ds0_B"], ["_ds2"], name="node2"), + node(first_arg, ["_ds1"], ["_ds3"], name="node3"), + node(first_arg, ["_ds2", "_ds3"], ["_ds4"], name="node4"), + ] + ) + + +@pytest.fixture +def pipeline_triangular(): + r""" + + (node1) + | \ + | [node2] + | / + [node3*] + + """ + return pipeline( + [ + node(first_arg, ["ds0_A"], ["_ds1_A"], name="node1"), + node(first_arg, ["_ds1_A"], ["ds2_A"], name="node2"), + node(first_arg, ["ds2_A", "_ds1_A"], ["_ds3_A"], name="node3"), + ] + ) + + +@pytest.fixture +def pipeline_triangular2(): + r""" + + (node1) + | \ + | [node1b] + | | + | [node2] + | / + [node3 +] + + """ + return pipeline( + [ + node(first_arg, ["ds0_A"], ["_ds1_A"], name="node1"), + node(first_arg, ["_ds1_A"], ["ds2_A"], name="node2"), + node(first_arg, ["ds2_A", "_ds1_A"], ["_ds3_A"], name="node3"), + ] + ) + + +@pytest.fixture +def empty_pipeline(): + return pipeline([]) + + +@pytest.fixture( + params=[(), ("dsX",), ("params:p",)], + ids=[ + "no_extras", + "extra_persistent_ds", + "extra_param", + ], +) +def two_branches_crossed_pipeline_variable_inputs(request): + """A ``Pipeline`` with an X-shape (two branches with one common node). + Non-persistent datasets (other than parameters) are prefixed with an underscore. + """ + extra_inputs = list(request.param) + + return pipeline( + [ + node(first_arg, ["ds0_A"] + extra_inputs, "_ds1_A", name="node1_A"), + node(first_arg, ["ds0_B"] + extra_inputs, "_ds1_B", name="node1_B"), + node( + multi_input_list_output, + ["_ds1_A", "_ds1_B"] + extra_inputs, + ["ds2_A", "ds2_B"], + name="node2", + ), + node(first_arg, ["ds2_A"] + extra_inputs, "_ds3_A", name="node3_A"), + node(first_arg, ["ds2_B"] + extra_inputs, "_ds3_B", name="node3_B"), + node(first_arg, ["_ds3_A"] + extra_inputs, "_ds4_A", name="node4_A"), + node(first_arg, ["_ds3_B"] + extra_inputs, "_ds4_B", name="node4_B"), + ] + ) diff --git a/tests/runner/test_resume_logic.py b/tests/runner/test_resume_logic.py new file mode 100644 index 0000000000..bd1f8e8acb --- /dev/null +++ b/tests/runner/test_resume_logic.py @@ -0,0 +1,158 @@ +import pytest + +from kedro.io import ( + DataCatalog, + LambdaDataset, +) +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline +from kedro.runner.runner import ( + _find_all_nodes_for_resumed_pipeline, + _find_nodes_to_resume_from, +) + + +@pytest.mark.parametrize( + "pipeline_name,remaining_node_names,expected_result", + [ + ("pipeline_asymmetric", {"node2", "node3", "node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", {"node3", "node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", {"node4"}, {"node1", "node2"}), + ("pipeline_asymmetric", set(), set()), + ("empty_pipeline", set(), set()), + ("pipeline_triangular", {"node3"}, {"node1"}), + ( + "two_branches_crossed_pipeline", + { + "node1_A", + "node1_B", + "node2", + "node3_A", + "node3_B", + "node4_A", + "node4_B", + }, + {"node1_A", "node1_B"}, + ), + ( + "two_branches_crossed_pipeline", + {"node2", "node3_A", "node3_B", "node4_A", "node4_B"}, + {"node1_A", "node1_B"}, + ), + ( + "two_branches_crossed_pipeline", + ["node3_A", "node3_B", "node4_A", "node4_B"], + {"node3_A", "node3_B"}, + ), + ( + "two_branches_crossed_pipeline", + ["node4_A", "node4_B"], + {"node3_A", "node3_B"}, + ), + ("two_branches_crossed_pipeline", ["node4_A"], {"node3_A"}), + ("two_branches_crossed_pipeline", ["node3_A", "node4_A"], {"node3_A"}), + ], +) +class TestResumeLogicBehaviour: + def test_simple_pipeline( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test suggestion for simple pipelines with a mix of persistent + and memory datasets. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + result_node_names = _find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + assert expected_result == result_node_names + + def test_all_datasets_persistent( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test suggestion for pipelines where all datasets are persisted: + In that case, exactly the set of remaining nodes should be re-run. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + catalog = DataCatalog( + dict.fromkeys( + test_pipeline.datasets(), + LambdaDataset(load=lambda: 42, save=lambda data: None), + ) + ) + + remaining_nodes = set(test_pipeline.only_nodes(*remaining_node_names).nodes) + result_node_names = _find_nodes_to_resume_from( + test_pipeline, + remaining_nodes, + catalog, + ) + final_pipeline_nodes = set(test_pipeline.from_nodes(*result_node_names).nodes) + assert final_pipeline_nodes == remaining_nodes + + @pytest.mark.parametrize("extra_input", ["params:p", "dsY"]) + def test_added_shared_input( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + extra_input, + request, + ): + """ + Test suggestion for pipelines where a single persistent dataset or + parameter is shared across all nodes. These do not change and + therefore should not affect resume suggestion. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + # Add parameter shared across all nodes + test_pipeline = modular_pipeline( + [n._copy(inputs=[*n.inputs, extra_input]) for n in test_pipeline.nodes] + ) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + result_node_names = _find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + assert expected_result == result_node_names + + def test_suggestion_consistency( + self, + pipeline_name, + persistent_dataset_catalog, + remaining_node_names, + expected_result, + request, + ): + """ + Test that suggestions are internally consistent; pipeline generated + from resume nodes should exactly contain set of all required nodes. + """ + test_pipeline = request.getfixturevalue(pipeline_name) + + remaining_nodes = test_pipeline.only_nodes(*remaining_node_names).nodes + required_nodes = _find_all_nodes_for_resumed_pipeline( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + resume_node_names = _find_nodes_to_resume_from( + test_pipeline, remaining_nodes, persistent_dataset_catalog + ) + + assert set(n.name for n in required_nodes) == set( + n.name for n in test_pipeline.from_nodes(*resume_node_names).nodes + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 40b039ba88..dbc73a30f0 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -252,18 +252,18 @@ def test_confirms(self, mocker, test_pipeline, is_async): fake_dataset_instance.confirm.assert_called_once_with() -@pytest.mark.parametrize( - "failing_node_names,expected_pattern", - [ - (["node1_A", "node1_B"], r"No nodes ran."), - (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), - (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), - (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), - ], -) class TestSuggestResumeScenario: + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A", "node1_B"], r"No nodes ran."), + (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), + ], + ) def test_suggest_resume_scenario( self, caplog, @@ -286,6 +286,44 @@ def test_suggest_resume_scenario( ) assert re.search(expected_pattern, caplog.text) + @pytest.mark.parametrize( + "failing_node_names,expected_pattern", + [ + (["node1_A", "node1_B"], r"No nodes ran."), + (["node2"], r'"node1_A,node1_B"'), + (["node3_A"], r'"node3_A,node3_B"'), + (["node4_A"], r'"node3_A,node3_B"'), + (["node3_A", "node4_A"], r'"node3_A,node3_B"'), + (["node2", "node4_A"], r'"node1_A,node1_B"'), + ], + ) + def test_stricter_suggest_resume_scenario( + self, + caplog, + two_branches_crossed_pipeline_variable_inputs, + persistent_dataset_catalog, + failing_node_names, + expected_pattern, + ): + """ + Stricter version of previous test. + Covers pipelines where inputs are shared across nodes. + """ + test_pipeline = two_branches_crossed_pipeline_variable_inputs + + nodes = {n.name: n for n in test_pipeline.nodes} + for name in failing_node_names: + test_pipeline -= modular_pipeline([nodes[name]]) + test_pipeline += modular_pipeline([nodes[name]._copy(func=exception_fn)]) + + with pytest.raises(Exception, match="test exception"): + SequentialRunner().run( + test_pipeline, + persistent_dataset_catalog, + hook_manager=_create_hook_manager(), + ) + assert re.search(expected_pattern, caplog.text) + class TestMemoryDatasetBehaviour: def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets):