Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise pipeline addition and creation #3730

Merged
merged 12 commits into from
Mar 26, 2024
Merged
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* Cookiecutter errors are shown in short format without the `--verbose` flag.
* Kedro commands now work from any subdirectory within a Kedro project.
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
* Dropped the dependency on `toposort` in favour of the built-in `graphlib` module.
* Improve the performance of `Pipeline` object creation and summing.

## Bug fixes and other changes
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
Expand Down
89 changes: 39 additions & 50 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
"""
from __future__ import annotations

import copy
import json
from collections import Counter, defaultdict
from itertools import chain
from typing import Any, Iterable

from toposort import CircularDependencyError as ToposortCircleError
from toposort import toposort
from graphlib import CycleError, TopologicalSorter

import kedro
from kedro.pipeline.node import Node, _to_list
Expand Down Expand Up @@ -145,7 +143,10 @@ def __init__(
_validate_transcoded_inputs_outputs(nodes_chain)
_tags = set(_to_list(tags))

tagged_nodes = [n.tag(_tags) for n in nodes_chain]
if _tags:
tagged_nodes = [n.tag(_tags) for n in nodes_chain]
else:
tagged_nodes = nodes_chain

self._nodes_by_name = {node.name: node for node in tagged_nodes}
_validate_unique_outputs(tagged_nodes)
Expand All @@ -164,7 +165,17 @@ def __init__(
self._nodes_by_output[_strip_transcoding(output)] = node

self._nodes = tagged_nodes
self._topo_sorted_nodes = _topologically_sorted(self.node_dependencies)
self._toposorter = TopologicalSorter(self.node_dependencies)

# test for circular dependencies without executing the toposort for efficiency
try:
self._toposorter.prepare()
except CycleError as exc:
message = f"Circular dependencies exist among these items: {exc.args[1]}"
raise CircularDependencyError(message) from exc

self._toposorted_nodes: list[Node] = []
self._toposorted_groups: list[list[Node]] = []

def __repr__(self) -> str: # pragma: no cover
"""Pipeline ([node1, ..., node10 ...], name='pipeline_name')"""
Expand All @@ -181,7 +192,7 @@ def __repr__(self) -> str: # pragma: no cover
def __add__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def __radd__(self, other: Any) -> Pipeline:
if isinstance(other, int) and other == 0:
Expand All @@ -191,17 +202,17 @@ def __radd__(self, other: Any) -> Pipeline:
def __sub__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) - set(other.nodes))
return Pipeline(set(self._nodes) - set(other._nodes))

def __and__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) & set(other.nodes))
return Pipeline(set(self._nodes) & set(other._nodes))

def __or__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def all_inputs(self) -> set[str]:
"""All inputs for all nodes in the pipeline.
Expand All @@ -210,7 +221,7 @@ def all_inputs(self) -> set[str]:
All node input names as a Set.

"""
return set.union(set(), *(node.inputs for node in self.nodes))
return set.union(set(), *(node.inputs for node in self._nodes))

def all_outputs(self) -> set[str]:
"""All outputs of all nodes in the pipeline.
Expand All @@ -219,7 +230,7 @@ def all_outputs(self) -> set[str]:
All node outputs.

"""
return set.union(set(), *(node.outputs for node in self.nodes))
return set.union(set(), *(node.outputs for node in self._nodes))

def _remove_intermediates(self, datasets: set[str]) -> set[str]:
intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & {
Expand Down Expand Up @@ -349,7 +360,10 @@ def nodes(self) -> list[Node]:
The list of all pipeline nodes in topological order.

"""
return list(chain.from_iterable(self._topo_sorted_nodes))
if not self._toposorted_nodes:
self._toposorted_nodes = [n for group in self.grouped_nodes for n in group]

return list(self._toposorted_nodes)

@property
def grouped_nodes(self) -> list[list[Node]]:
Expand All @@ -361,7 +375,14 @@ def grouped_nodes(self) -> list[list[Node]]:
The pipeline nodes in topologically ordered groups.

"""
return copy.copy(self._topo_sorted_nodes)

if not self._toposorted_groups and self._toposorter:
while self._toposorter:
group = sorted(self._toposorter.get_ready())
self._toposorted_groups.append(group)
self._toposorter.done(*group)

return [list(group) for group in self._toposorted_groups]

def only_nodes(self, *node_names: str) -> Pipeline:
"""Create a new ``Pipeline`` which will contain only the specified
Expand Down Expand Up @@ -417,7 +438,7 @@ def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline:
"""
nodes = [
n
for n in self.nodes
for n in self._nodes
if n.namespace and n.namespace.startswith(node_namespace)
]
if not nodes:
Expand Down Expand Up @@ -676,7 +697,7 @@ def only_nodes_with_tags(self, *tags: str) -> Pipeline:
of the tags provided are being copied.
"""
unique_tags = set(tags)
nodes = [node for node in self.nodes if unique_tags & node.tags]
nodes = [node for node in self._nodes if unique_tags & node.tags]
return Pipeline(nodes)

def filter( # noqa: PLR0913
Expand Down Expand Up @@ -760,7 +781,7 @@ def filter( # noqa: PLR0913
# would give different outcomes depending on the order of filter methods:
# only_nodes and then from_inputs would give node1, while only_nodes and then
# from_inputs would give node1 and node3.
filtered_pipeline = Pipeline(self.nodes)
filtered_pipeline = Pipeline(self._nodes)
for subset_pipeline in subset_pipelines:
filtered_pipeline &= subset_pipeline

Expand All @@ -779,7 +800,7 @@ def tag(self, tags: str | Iterable[str]) -> Pipeline:
Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
nodes = [n.tag(tags) for n in self._nodes]
return Pipeline(nodes)

def to_json(self) -> str:
Expand All @@ -791,7 +812,7 @@ def to_json(self) -> str:
"outputs": list(n.outputs),
"tags": list(n.tags),
}
for n in self.nodes
for n in self._nodes
]
pipeline_versioned = {
"kedro_version": kedro.__version__,
Expand Down Expand Up @@ -884,38 +905,6 @@ def _validate_transcoded_inputs_outputs(nodes: list[Node]) -> None:
)


def _topologically_sorted(node_dependencies: dict[Node, set[Node]]) -> list[list[Node]]:
"""Topologically group and sort (order) nodes such that no node depends on
a node that appears in the same or a later group.

Raises:
CircularDependencyError: When it is not possible to topologically order
provided nodes.

Returns:
The list of node sets in order of execution. First set is nodes that should
be executed first (no dependencies), second set are nodes that should be
executed on the second step, etc.
"""

def _circle_error_message(error_data: dict[Any, set]) -> str:
"""Error messages provided by the toposort library will
refer to indices that are used as an intermediate step.
This method can be used to replace that message with
one that refers to the nodes' string representations.
"""
circular = [str(node) for node in error_data.keys()]
return f"Circular dependencies exist among these items: {circular}"

try:
# Sort it so it has consistent order when run with SequentialRunner
result = [sorted(dependencies) for dependencies in toposort(node_dependencies)]
return result
except ToposortCircleError as exc:
message = _circle_error_message(exc.data)
raise CircularDependencyError(message) from exc


class CircularDependencyError(Exception):
"""Raised when it is not possible to provide a topological execution
order for nodes, due to a circular dependency existing in the node
Expand Down
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"rich>=12.0,<14.0",
"rope>=0.21,<2.0", # subject to LGPLv3 license
"toml>=0.10.0",
"toposort>=1.5", # Needs to be at least 1.5 to be able to raise CircularDependencyError
"graphlib_backport>=1.0.0; python_version < '3.9'",
]
keywords = [
"pipelines",
Expand Down Expand Up @@ -81,8 +81,7 @@ test = [
"pandas-stubs",
"types-PyYAML",
"types-cachetools",
"types-toml",
"types-toposort"
"types-toml"
]
docs = [
"docutils<0.21",
Expand Down
2 changes: 1 addition & 1 deletion tests/runner/test_sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def test_confirms(self, mocker, test_pipeline, is_async):
@pytest.mark.parametrize(
"failing_node_names,expected_pattern",
[
(["node1_A"], r"No nodes ran."),
(["node1_A", "node1_B"], r"No nodes ran."),
(["node2"], r"(node1_A,node1_B|node1_B,node1_A)"),
(["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"),
(["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"),
Expand Down