Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise pipeline addition and creation #3730

Merged
merged 12 commits into from
Mar 26, 2024
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Kedro commands now work from any subdirectory within a Kedro project.
* Kedro CLI now provides a better error message when project commands are run outside of a project i.e. `kedro run`.
* Dropped the dependency on `toposort` in favour of the built-in `graphlib` module.
* Improve the performance of `Pipeline` object creation and summing.

## Bug fixes and other changes
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
Expand Down
91 changes: 36 additions & 55 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ def __init__(
_validate_transcoded_inputs_outputs(nodes_chain)
_tags = set(_to_list(tags))

tagged_nodes = [n.tag(_tags) for n in nodes_chain]
if _tags:
tagged_nodes = [n.tag(_tags) for n in nodes_chain]
else:
tagged_nodes = nodes_chain

self._nodes_by_name = {node.name: node for node in tagged_nodes}
_validate_unique_outputs(tagged_nodes)
Expand All @@ -162,7 +165,17 @@ def __init__(
self._nodes_by_output[_strip_transcoding(output)] = node

self._nodes = tagged_nodes
self._toposorted_nodes = _toposort(self.node_dependencies)
self._toposorter = TopologicalSorter(self.node_dependencies)

# test for circular dependencies without executing the toposort for efficiency
try:
self._toposorter.prepare()
except CycleError as exc:
message = f"Circular dependencies exist among these items: {exc.args[1]}"
raise CircularDependencyError(message) from exc

self._toposorted_nodes: list[Node] = []
self._toposorted_groups: list[list[Node]] = []

def __repr__(self) -> str: # pragma: no cover
"""Pipeline ([node1, ..., node10 ...], name='pipeline_name')"""
Expand All @@ -179,7 +192,7 @@ def __repr__(self) -> str: # pragma: no cover
def __add__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def __radd__(self, other: Any) -> Pipeline:
if isinstance(other, int) and other == 0:
Expand All @@ -189,17 +202,17 @@ def __radd__(self, other: Any) -> Pipeline:
def __sub__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) - set(other.nodes))
return Pipeline(set(self._nodes) - set(other._nodes))

def __and__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes) & set(other.nodes))
return Pipeline(set(self._nodes) & set(other._nodes))

def __or__(self, other: Any) -> Pipeline:
if not isinstance(other, Pipeline):
return NotImplemented
return Pipeline(set(self.nodes + other.nodes))
return Pipeline(set(self._nodes + other._nodes))

def all_inputs(self) -> set[str]:
"""All inputs for all nodes in the pipeline.
Expand All @@ -208,7 +221,7 @@ def all_inputs(self) -> set[str]:
All node input names as a Set.

"""
return set.union(set(), *(node.inputs for node in self.nodes))
return set.union(set(), *(node.inputs for node in self._nodes))

def all_outputs(self) -> set[str]:
"""All outputs of all nodes in the pipeline.
Expand All @@ -217,7 +230,7 @@ def all_outputs(self) -> set[str]:
All node outputs.

"""
return set.union(set(), *(node.outputs for node in self.nodes))
return set.union(set(), *(node.outputs for node in self._nodes))

def _remove_intermediates(self, datasets: set[str]) -> set[str]:
intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & {
Expand Down Expand Up @@ -347,6 +360,9 @@ def nodes(self) -> list[Node]:
The list of all pipeline nodes in topological order.

"""
if not self._toposorted_nodes:
self._toposorted_nodes = [n for group in self.grouped_nodes for n in group]

return list(self._toposorted_nodes)

@property
Expand All @@ -360,7 +376,13 @@ def grouped_nodes(self) -> list[list[Node]]:

"""

return _group_toposorted(self._toposorted_nodes, self.node_dependencies)
if not self._toposorted_groups:
while self._toposorter:
group = sorted(self._toposorter.get_ready())
self._toposorted_groups.append(group)
self._toposorter.done(*group)

return [list(group) for group in self._toposorted_groups]

def only_nodes(self, *node_names: str) -> Pipeline:
"""Create a new ``Pipeline`` which will contain only the specified
Expand Down Expand Up @@ -416,7 +438,7 @@ def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline:
"""
nodes = [
n
for n in self.nodes
for n in self._nodes
if n.namespace and n.namespace.startswith(node_namespace)
]
if not nodes:
Expand Down Expand Up @@ -675,7 +697,7 @@ def only_nodes_with_tags(self, *tags: str) -> Pipeline:
of the tags provided are being copied.
"""
unique_tags = set(tags)
nodes = [node for node in self.nodes if unique_tags & node.tags]
nodes = [node for node in self._nodes if unique_tags & node.tags]
return Pipeline(nodes)

def filter( # noqa: PLR0913
Expand Down Expand Up @@ -759,7 +781,7 @@ def filter( # noqa: PLR0913
# would give different outcomes depending on the order of filter methods:
# only_nodes and then from_inputs would give node1, while only_nodes and then
# from_inputs would give node1 and node3.
filtered_pipeline = Pipeline(self.nodes)
filtered_pipeline = Pipeline(self._nodes)
for subset_pipeline in subset_pipelines:
filtered_pipeline &= subset_pipeline

Expand All @@ -778,7 +800,7 @@ def tag(self, tags: str | Iterable[str]) -> Pipeline:
Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
nodes = [n.tag(tags) for n in self._nodes]
return Pipeline(nodes)

def to_json(self) -> str:
Expand All @@ -790,7 +812,7 @@ def to_json(self) -> str:
"outputs": list(n.outputs),
"tags": list(n.tags),
}
for n in self.nodes
for n in self._nodes
]
pipeline_versioned = {
"kedro_version": kedro.__version__,
Expand Down Expand Up @@ -883,47 +905,6 @@ def _validate_transcoded_inputs_outputs(nodes: list[Node]) -> None:
)


def _group_toposorted(
toposorted: Iterable[Node], deps: dict[Node, set[Node]]
) -> list[list[Node]]:
"""Group already toposorted nodes into independent toposorted groups"""
processed: set[Node] = set()
groups = []
group = []
for x in toposorted:
if set(deps.get(x, set())) <= processed:
group.append(x)
elif group:
processed |= set(group)
groups.append(sorted(group))
group = [x]

if group:
groups.append(sorted(group))
return groups


def _toposort(node_dependencies: dict[Node, set[Node]]) -> list[Node]:
"""Topologically sort (order) nodes such that no node depends on
a node that appears earlier in the list.

Raises:
CircularDependencyError: When it is not possible to topologically order
provided nodes.

Returns:
The list of nodes in order of execution.
"""
try:
sorter = TopologicalSorter(node_dependencies)
# Ensure stable toposort by sorting the nodes in a group
groups = _group_toposorted(sorter.static_order(), node_dependencies)
return [n for group in groups for n in group]
except CycleError as exc:
message = f"Circular dependencies exist among these items: {exc.args[1]}"
raise CircularDependencyError(message) from exc


class CircularDependencyError(Exception):
"""Raised when it is not possible to provide a topological execution
order for nodes, due to a circular dependency existing in the node
Expand Down