Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resume suggestions #3719

Merged
merged 13 commits into from
Apr 2, 2024
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
* Dropped the dependency on `toposort` in favour of the built-in `graphlib` module.
* Improve the performance of `Pipeline` object creation and summing.
* Improve suggestions to resume failed pipeline runs.

## Bug fixes and other changes
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
Expand All @@ -18,6 +19,9 @@
## Documentation changes

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:

Check warning on line 22 in RELEASE.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Kedro.weaselwords] 'Many' is a weasel word! Raw Output: {"message": "[Kedro.weaselwords] 'Many' is a weasel word!", "location": {"path": "RELEASE.md", "range": {"start": {"line": 22, "column": 1}}}, "severity": "WARNING"}

Check warning on line 22 in RELEASE.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Kedro.Spellings] Did you really mean 'Kedroids'? Raw Output: {"message": "[Kedro.Spellings] Did you really mean 'Kedroids'?", "location": {"path": "RELEASE.md", "range": {"start": {"line": 22, "column": 30}}}, "severity": "WARNING"}

* [ondrejzacha](https://github.com/ondrejzacha)

Check warning on line 24 in RELEASE.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Kedro.Spellings] Did you really mean 'ondrejzacha'? Raw Output: {"message": "[Kedro.Spellings] Did you really mean 'ondrejzacha'?", "location": {"path": "RELEASE.md", "range": {"start": {"line": 24, "column": 4}}}, "severity": "WARNING"}

# Release 0.19.3

Expand Down
180 changes: 131 additions & 49 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
as_completed,
wait,
)
from typing import Any, Iterable, Iterator
from typing import Any, Collection, Iterable, Iterator

from more_itertools import interleave
from pluggy import PluginManager
Expand Down Expand Up @@ -198,98 +198,180 @@ def _suggest_resume_scenario(

postfix = ""
if done_nodes:
node_names = (n.name for n in remaining_nodes)
resume_p = pipeline.only_nodes(*node_names)
start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs())

# find the nearest persistent ancestors of the nodes in start_p
start_p_persistent_ancestors = _find_persistent_ancestors(
pipeline, start_p.nodes, catalog
start_node_names = _find_nodes_to_resume_from(
pipeline=pipeline,
unfinished_nodes=remaining_nodes,
catalog=catalog,
)

start_node_names = (n.name for n in start_p_persistent_ancestors)
postfix += f" --from-nodes \"{','.join(start_node_names)}\""
start_nodes_str = ",".join(sorted(start_node_names))
postfix += f' --from-nodes "{start_nodes_str}"'

if not postfix:
self._logger.warning(
"No nodes ran. Repeat the previous command to attempt a new run."
)
else:
self._logger.warning(
"There are %d nodes that have not run.\n"
f"There are {len(remaining_nodes)} nodes that have not run.\n"
"You can resume the pipeline run from the nearest nodes with "
"persisted inputs by adding the following "
"argument to your previous command:\n%s",
len(remaining_nodes),
postfix,
f"argument to your previous command:\n{postfix}"
)


def _find_persistent_ancestors(
pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog
def _find_nodes_to_resume_from(
pipeline: Pipeline, unfinished_nodes: Collection[Node], catalog: DataCatalog
) -> set[str]:
"""Given a collection of unfinished nodes in a pipeline using
a certain catalog, find the node names to pass to pipeline.from_nodes()
to cover all unfinished nodes, including any additional nodes
that should be re-run if their outputs are not persisted.

Args:
pipeline: the ``Pipeline`` to find starting nodes for.
unfinished_nodes: collection of ``Node``s that have not finished yet
catalog: the ``DataCatalog`` of the run.

Returns:
Set of node names to pass to pipeline.from_nodes() to continue
the run.

"""
nodes_to_be_run = _find_all_nodes_for_resumed_pipeline(
pipeline, unfinished_nodes, catalog
)

# Find which of the remaining nodes would need to run first (in topo sort)
persistent_ancestors = _find_initial_node_group(pipeline, nodes_to_be_run)

return {n.name for n in persistent_ancestors}


def _find_all_nodes_for_resumed_pipeline(
pipeline: Pipeline, unfinished_nodes: Iterable[Node], catalog: DataCatalog
) -> set[Node]:
"""Breadth-first search approach to finding the complete set of
persistent ancestors of an iterable of ``Node``s. Persistent
ancestors exclusively have persisted ``Dataset``s as inputs.
``Node``s which need to run to cover all unfinished nodes,
including any additional nodes that should be re-run if their outputs
are not persisted.

Args:
pipeline: the ``Pipeline`` to find ancestors in.
children: the iterable containing ``Node``s to find ancestors of.
pipeline: the ``Pipeline`` to analyze.
unfinished_nodes: the iterable of ``Node``s which have not finished yet.
catalog: the ``DataCatalog`` of the run.

Returns:
A set containing first persistent ancestors of the given
``Node``s.
A set containing all input unfinished ``Node``s and all remaining
``Node``s that need to run in case their outputs are not persisted.

"""
ancestor_nodes_to_run = set()
queue, visited = deque(children), set(children)
nodes_to_run = set(unfinished_nodes)
initial_nodes = _nodes_with_external_inputs(unfinished_nodes)

queue, visited = deque(initial_nodes), set(initial_nodes)
DimedS marked this conversation as resolved.
Show resolved Hide resolved
while queue:
current_node = queue.popleft()
if _has_persistent_inputs(current_node, catalog):
ancestor_nodes_to_run.add(current_node)
continue
for parent in _enumerate_parents(pipeline, current_node):
if parent in visited:
nodes_to_run.add(current_node)
# Look for parent nodes which produce non-persistent inputs (if those exist)
non_persistent_inputs = _enumerate_non_persistent_inputs(current_node, catalog)
for node in _enumerate_nodes_with_outputs(pipeline, non_persistent_inputs):
if node in visited:
continue
visited.add(parent)
queue.append(parent)
return ancestor_nodes_to_run
visited.add(node)
queue.append(node)

# Make sure no downstream tasks are skipped
nodes_to_run = set(pipeline.from_nodes(*(n.name for n in nodes_to_run)).nodes)
DimedS marked this conversation as resolved.
Show resolved Hide resolved

def _enumerate_parents(pipeline: Pipeline, child: Node) -> list[Node]:
"""For a given ``Node``, returns a list containing the direct parents
of that ``Node`` in the given ``Pipeline``.
return nodes_to_run


def _nodes_with_external_inputs(nodes_of_interest: Iterable[Node]) -> set[Node]:
merelcht marked this conversation as resolved.
Show resolved Hide resolved
"""For given ``Node``s , find their subset which depends on
external inputs of the ``Pipeline`` they constitute. External inputs
are pipeline inputs not produced by other ``Node``s in the ``Pipeline``.

Args:
pipeline: the ``Pipeline`` to search for direct parents in.
child: the ``Node`` to find parents of.
nodes_of_interest: the ``Node``s to analyze.

Returns:
A list of all ``Node``s that are direct parents of ``child``.
A set of ``Node``s that depend on external inputs
of nodes of interest.

"""
parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs)
return parent_pipeline.nodes
p_nodes_of_interest = Pipeline(nodes_of_interest)
p_nodes_with_external_inputs = p_nodes_of_interest.only_nodes_with_inputs(
*p_nodes_of_interest.inputs()
)
return set(p_nodes_with_external_inputs.nodes)


def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool:
"""Check if a ``Node`` exclusively has persisted Datasets as inputs.
If at least one input is a ``MemoryDataset``, return False.
def _enumerate_non_persistent_inputs(node: Node, catalog: DataCatalog) -> set[str]:
"""Enumerate non-persistent input datasets of a ``Node``.

Args:
node: the ``Node`` to check the inputs of.
catalog: the ``DataCatalog`` of the run.

Returns:
True if the ``Node`` being checked exclusively has inputs that
are not ``MemoryDataset``, else False.
Set of names of non-persistent inputs of given ``Node``.

"""
# We use _datasets because they pertain parameter name format
catalog_datasets = catalog._datasets
non_persistent_inputs: set[str] = set()
for node_input in node.inputs:
if isinstance(catalog._datasets[node_input], MemoryDataset):
return False
return True
if node_input.startswith("params:"):
continue

if (
node_input not in catalog_datasets
or catalog_datasets[node_input]._EPHEMERAL
):
non_persistent_inputs.add(node_input)

return non_persistent_inputs


def _enumerate_nodes_with_outputs(
pipeline: Pipeline, outputs: Collection[str]
) -> list[Node]:
"""For given outputs, returns a list containing nodes that
generate them in the given ``Pipeline``.

Args:
pipeline: the ``Pipeline`` to search for nodes in.
outputs: the dataset names to find source nodes for.

Returns:
A list of all ``Node``s that are producing ``outputs``.

"""
parent_pipeline = pipeline.only_nodes_with_outputs(*outputs)
return parent_pipeline.nodes


def _find_initial_node_group(pipeline: Pipeline, nodes: Iterable[Node]) -> list[Node]:
"""Given a collection of ``Node``s in a ``Pipeline``,
find the initial group of ``Node``s to be run (in topological order).

This can be used to define a sub-pipeline with the smallest possible
set of nodes to pass to --from-nodes.

Args:
pipeline: the ``Pipeline`` to search for initial ``Node``s in.
nodes: the ``Node``s to find initial group for.

Returns:
A list of initial ``Node``s to run given inputs (in topological order).

"""
node_names = set(n.name for n in nodes)
if len(node_names) == 0:
return []
sub_pipeline = pipeline.only_nodes(*node_names)
initial_nodes = sub_pipeline.grouped_nodes[0]
return initial_nodes


def run_node(
Expand Down
Loading
Loading