-
Notifications
You must be signed in to change notification settings - Fork 112
/
layers.py
111 lines (92 loc) · 5.14 KB
/
layers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""`kedro_viz.services.layers` defines layers-related logic."""
import logging
from collections import defaultdict
from typing import Dict, List, Set
from toposort import CircularDependencyError, toposort_flatten
from kedro_viz.models.flowchart import GraphNode
logger = logging.getLogger(__name__)
def sort_layers(
nodes: Dict[str, GraphNode], dependencies: Dict[str, Set[str]]
) -> List[str]:
"""Given a DAG represented by a dictionary of nodes, some of which have a `layer` attribute,
along with their dependencies, return the list of all layers sorted according to
the nodes' topological order, i.e. a layer should appear before another layer in the list
if its node is a dependency of the other layer's node, directly or indirectly.
For example, given the following graph:
node1(layer=a) -> node2 -> node4 -> node6(layer=d)
| ^
v |
node3(layer=b) -> node5(layer=c)
The layers ordering should be: [a, b, c, d]
In theory, this is a problem of finding the
[transitive closure](https://en.wikipedia.org/wiki/Transitive_closure) in a graph of layers
and then toposort them. The algorithm below follows a repeated depth-first search approach:
* For every node, find all layers that depends on it in a depth-first search.
* While traversing, build up a dictionary of {node_id -> layers} for the node
that have already been visited.
* Turn the final {node_id -> layers} into a {layer -> layers} to represent the layers'
dependencies. Note: the key is a layer and the values are the parents of that layer,
just because that's the format toposort requires.
* Feed this layers dictionary to ``toposort`` and return the sorted values.
* Raise CircularDependencyError if the layers cannot be sorted topologically,
i.e. there are cycles among the layers.
Args:
nodes: A dictionary of {node_id -> node} represents the nodes in the graph.
dependencies: A dictionary of {node_id -> set(child_ids)}
represents the direct dependencies between nodes in the graph.
Returns:
The list of layers sorted based on topological order.
Raises:
CircularDependencyError: When the layers have cyclic dependencies.
"""
node_layers: Dict[str, Set[str]] = {} # map node_id to the layers that depend on it
def find_child_layers(node_id: str) -> Set[str]:
"""For the given node_id, find all layers that depend on it in a depth-first manner.
Build up the node_layers dependency dictionary while traversing so each node is visited
only once.
Note: Python's default recursive depth limit is 1000, which means this algorithm won't
work for pipeline with more than 1000 nodes. However, we can rewrite this using stack if
we run into this limit in practice.
"""
if node_id in node_layers:
return node_layers[node_id]
node_layers[node_id] = set()
# The layer of the current node can also be considered as depending on that node.
# This is to cater for the edge case where all nodes are completely disjoint from each other
# and no dependency graph for layers can be constructed,
# yet the layers still need to be displayed.
node_layer = getattr(nodes[node_id], "layer", None)
if node_layer is not None:
node_layers[node_id].add(node_layer)
# for each child node of the given node_id,
# mark its layer and all layers that depend on it as child layers of the given node_id.
for child_node_id in dependencies[node_id]:
child_node = nodes[child_node_id]
child_layer = getattr(child_node, "layer", None)
if child_layer is not None:
node_layers[node_id].add(child_layer)
node_layers[node_id].update(find_child_layers(child_node_id))
return node_layers[node_id]
# populate node_layers dependencies
for node_id in nodes:
find_child_layers(node_id)
# compute the layer dependencies dictionary based on the node_layers dependencies,
# represented as {layer -> set(parent_layers)}
layer_dependencies = defaultdict(set)
for node_id, child_layers in node_layers.items():
node_layer = getattr(nodes[node_id], "layer", None)
# add the node's layer as a parent layer for all child layers.
# Even if a child layer is the same as the node's layer, i.e. a layer is marked
# as its own parent, toposort still works so we don't need to check for that explicitly.
if node_layer is not None:
for layer in child_layers:
layer_dependencies[layer].add(node_layer)
# toposort the layer_dependencies to find the layer order.
# Note that for string, toposort_flatten will default to alphabetical order for tie-break.
try:
return toposort_flatten(layer_dependencies)
except CircularDependencyError:
logger.warning(
"Layers visualisation is disabled as circular dependency detected among layers."
)
return []