forked from aws/aws-cdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
template-deps-to-dot
executable file
·164 lines (115 loc) · 4.02 KB
/
template-deps-to-dot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python3
"""A script to analyze the dependency relations in a CloudFormation template.
Use like so:
cdk -a integ.something.js synth | ./template-deps-to-dot | dot -Tpng > deps.png
"""
import collections
import fileinput
import sys
try:
import yaml
except ImportError:
print("Please run 'pip3 install pyyaml'")
sys.exit(1)
def main():
args = sys.argv[1:]
if args:
files = [open(filename) for filename in args]
else:
files = [sys.stdin]
templates = [yaml.safe_load(f) for f in files]
graph = DepGraph()
for template in templates:
if not template:
sys.stderr.write('Input does not look like a CloudFormation template.\n')
continue
parse_template(template, graph)
graph.render_dot(sys.stdout)
def parse_template(template, graph):
"""Parse template and add all encountered dependencies to the graph."""
for logical_id, resource_spec in template.get('Resources', {}).items():
resource_type = resource_spec.get('Type', 'AWS::???::???')
path = resource_spec.get('Metadata', {}).get('aws:cdk:path', None)
if path:
path = resource_name_from_path(path)
source = '%s\n(%s)' % (path or logical_id, resource_type)
graph.annotate(logical_id, source)
for dep in find_property_references(resource_spec.get('Properties', {})):
if not dep.target.startswith('AWS::'):
graph.add(logical_id, dep)
for depends_on in resource_spec.get('DependsOn', []):
graph.add(logical_id, Dep(depends_on, 'DependsOn'))
def resource_name_from_path(path):
return '/'.join([p for p in path.split('/') if p != 'Resource'][1:])
def find_property_references(properties):
"""Find references in a resource's Properties.
Returns:
list of Dep objects
"""
ret = []
def recurse(prop_name, obj):
if isinstance(obj, list):
for x in obj:
recurse(prop_name, x)
if isinstance(obj, dict):
ref = parse_reference(obj)
if ref:
ret.append(Dep(ref[0], prop_name))
return
for key, value in obj.items():
recurse(prop_name, value)
for prop_name, prop in properties.items():
recurse(prop_name, prop)
return ret
class DepGraph:
def __init__(self):
# { source -> [ { dependency, label } ]
self.graph = collections.defaultdict(set)
self.annotations = {}
self.has_incoming = set([])
def annotate(self, node, annotation):
self.annotations[node] = annotation
def add(self, source, dep):
self.graph[source].add(dep)
self.has_incoming.add(dep.target)
def render_dot(self, f):
"""Render a dot version of this graph to the given stream."""
f.write('digraph G {\n')
f.write(' rankdir=LR;\n')
f.write(' node [shape=box];\n')
for node, annotation in self.annotations.items():
if node in self.graph or node in self.has_incoming:
f.write(' %s [label=%s];\n'% (dot_escape(node), fancy_label(annotation)))
for source, deps in self.graph.items():
for dep in deps:
f.write(' %s -> %s [label=%s];\n' % (dot_escape(source), dot_escape(dep.target), dot_escape(dep.label)))
f.write('}\n')
def fancy_label(s):
lines = s.split('\n')
return ('<<FONT POINT-SIZE="14">' + lines[0] + '</FONT>'
+ ''.join('<BR/><FONT POINT-SIZE="10">' + line + '</FONT>' for line in lines[1:])
+ ' >')
def dot_escape(s):
return '"' + s.replace('\n', '\\n') + '"'
def parse_reference(obj):
"""If this object is an intrinsic reference, return info about it.
Returns: (logicalId, reference) if reference, None otherwise
"""
keys = list(obj.keys())
if keys == ['Ref']:
return (obj[keys[0]], 'Ref')
if keys == ['Fn::GetAtt']:
return (obj[keys[0]][0], obj[keys[0]][1])
return None
class Dep:
def __init__(self, target, label):
self.target = target
self.label = label
def __eq__(self, rhs):
return isinstance(rhs, Dep) and self.target == rhs.target and self.label == rhs.label
def __ne__(self, rhs):
return not (self == rhs)
def __hash__(self):
return hash((self.target, self.label))
if __name__ == '__main__':
main()