55import numpy as np
66from jobflow import job , Flow
77
8- from python_workflow_definition .shared import get_dict , get_list , get_kwargs , get_source_handles , convert_nodes_list_to_dict
8+ from python_workflow_definition .shared import (
9+ get_dict ,
10+ get_list ,
11+ get_kwargs ,
12+ get_source_handles ,
13+ convert_nodes_list_to_dict ,
14+ NODES_LABEL ,
15+ EDGES_LABEL ,
16+ SOURCE_LABEL ,
17+ SOURCE_PORT_LABEL ,
18+ TARGET_LABEL ,
19+ TARGET_PORT_LABEL ,
20+ )
921
1022
1123def _get_function_dict (flow ):
@@ -26,9 +38,19 @@ def _get_nodes_dict(function_dict):
2638
2739def _get_edge_from_dict (target , key , value_dict , nodes_mapping_dict ):
2840 if len (value_dict ['attributes' ]) == 1 :
29- return {"target" : target , "targetPort" : key , "source" : nodes_mapping_dict [value_dict ["uuid" ]], "sourcePort" : value_dict ["attributes" ][0 ][1 ]}
41+ return {
42+ TARGET_LABEL : target ,
43+ TARGET_PORT_LABEL : key ,
44+ SOURCE_LABEL : nodes_mapping_dict [value_dict ["uuid" ]],
45+ SOURCE_PORT_LABEL : value_dict ["attributes" ][0 ][1 ],
46+ }
3047 else :
31- return {"target" : target , "targetPort" : key , "source" : nodes_mapping_dict [value_dict ["uuid" ]], "sourcePort" : None }
48+ return {
49+ TARGET_LABEL : target ,
50+ TARGET_PORT_LABEL : key ,
51+ SOURCE_LABEL : nodes_mapping_dict [value_dict ["uuid" ]],
52+ SOURCE_PORT_LABEL : None ,
53+ }
3254
3355
3456def _get_edges_and_extend_nodes (flow_dict , nodes_mapping_dict , nodes_dict ):
@@ -59,8 +81,8 @@ def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
5981 nodes_dict [node_index ] = vt
6082 else :
6183 node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[str (vt )]
62- edges_lst .append ({"target" : node_dict_index , "targetPort" : kt , "source" : node_index , "sourcePort" : None })
63- edges_lst .append ({"target" : nodes_mapping_dict [job ["uuid" ]], "targetPort" : k , "source" : node_dict_index , "sourcePort" : None })
84+ edges_lst .append ({TARGET_LABEL : node_dict_index , TARGET_PORT_LABEL : kt , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
85+ edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_dict_index , SOURCE_PORT_LABEL : None })
6486 elif isinstance (v , list ) and any ([isinstance (el , dict ) and "@module" in el and "@class" in el and "@version" in el for el in v ]):
6587 node_list_index = len (nodes_dict )
6688 nodes_dict [node_list_index ] = get_list
@@ -78,15 +100,15 @@ def _get_edges_and_extend_nodes(flow_dict, nodes_mapping_dict, nodes_dict):
78100 nodes_dict [node_index ] = vt
79101 else :
80102 node_index = {str (tv ): tk for tk , tv in nodes_dict .items ()}[str (vt )]
81- edges_lst .append ({"target" : node_list_index , "targetPort" : kt , "source" : node_index , "sourcePort" : None })
82- edges_lst .append ({"target" : nodes_mapping_dict [job ["uuid" ]], "targetPort" : k , "source" : node_list_index , "sourcePort" : None })
103+ edges_lst .append ({TARGET_LABEL : node_list_index , TARGET_PORT_LABEL : kt , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
104+ edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_list_index , SOURCE_PORT_LABEL : None })
83105 else :
84106 if v not in nodes_dict .values ():
85107 node_index = len (nodes_dict )
86108 nodes_dict [node_index ] = v
87109 else :
88110 node_index = {tv : tk for tk , tv in nodes_dict .items ()}[v ]
89- edges_lst .append ({"target" : nodes_mapping_dict [job ["uuid" ]], "targetPort" : k , "source" : node_index , "sourcePort" : None })
111+ edges_lst .append ({TARGET_LABEL : nodes_mapping_dict [job ["uuid" ]], TARGET_PORT_LABEL : k , SOURCE_LABEL : node_index , SOURCE_PORT_LABEL : None })
90112 return edges_lst , nodes_dict
91113
92114
@@ -99,7 +121,7 @@ def _resort_total_lst(total_dict, nodes_dict):
99121 for ind in sorted (total_dict .keys ()):
100122 connect = total_dict [ind ]
101123 if ind not in ordered_lst :
102- source_lst = [sd ["source" ] for sd in connect .values ()]
124+ source_lst = [sd [SOURCE_LABEL ] for sd in connect .values ()]
103125 if all ([s in ordered_lst or s in nodes_without_dep_lst for s in source_lst ]):
104126 ordered_lst .append (ind )
105127 total_new_dict [ind ] = connect
@@ -109,11 +131,11 @@ def _resort_total_lst(total_dict, nodes_dict):
109131def _group_edges (edges_lst ):
110132 total_dict = {}
111133 for ed_major in edges_lst :
112- target_id = ed_major ["target" ]
134+ target_id = ed_major [TARGET_LABEL ]
113135 tmp_lst = []
114136 if target_id not in total_dict .keys ():
115137 for ed in edges_lst :
116- if target_id == ed ["target" ]:
138+ if target_id == ed [TARGET_LABEL ]:
117139 tmp_lst .append (ed )
118140 total_dict [target_id ] = get_kwargs (lst = tmp_lst )
119141 return total_dict
@@ -139,8 +161,8 @@ def get_attr_helper(obj, source_handle):
139161 else :
140162 fn = job (method = v )
141163 kwargs = {
142- kw : input_dict [vw ["source" ]] if vw ["source" ] in input_dict else get_attr_helper (
143- obj = memory_dict [vw ["source" ]], source_handle = vw ["sourcePort" ])
164+ kw : input_dict [vw [SOURCE_LABEL ]] if vw [SOURCE_LABEL ] in input_dict else get_attr_helper (
165+ obj = memory_dict [vw [SOURCE_LABEL ]], source_handle = vw [SOURCE_PORT_LABEL ])
144166 for kw , vw in total_dict [k ].items ()
145167 }
146168 memory_dict [k ] = fn (** kwargs )
@@ -159,21 +181,21 @@ def load_workflow_json(file_name):
159181 content = json .load (f )
160182
161183 edges_new_lst = []
162- for edge in content ["edges" ]:
163- if edge ["sourcePort" ] is None :
184+ for edge in content [EDGES_LABEL ]:
185+ if edge [SOURCE_PORT_LABEL ] is None :
164186 edges_new_lst .append (edge )
165187 else :
166188 edges_new_lst .append (
167189 {
168- "target" : edge ["target" ],
169- "targetPort" : edge ["targetPort" ],
170- "source" : edge ["source" ],
171- "sourcePort" : str (edge ["sourcePort" ]),
190+ TARGET_LABEL : edge [TARGET_LABEL ],
191+ TARGET_PORT_LABEL : edge [TARGET_PORT_LABEL ],
192+ SOURCE_LABEL : edge [SOURCE_LABEL ],
193+ SOURCE_PORT_LABEL : str (edge [SOURCE_PORT_LABEL ]),
172194 }
173195 )
174196
175197 nodes_new_dict = {}
176- for k , v in convert_nodes_list_to_dict (nodes_list = content ["nodes" ]).items ():
198+ for k , v in convert_nodes_list_to_dict (nodes_list = content [NODES_LABEL ]).items ():
177199 if isinstance (v , str ) and "." in v :
178200 p , m = v .rsplit ('.' , 1 )
179201 mod = import_module (p )
@@ -214,4 +236,4 @@ def write_workflow_json(flow, file_name="workflow.json"):
214236 nodes_store_lst .append ({"id" : k , "value" : v })
215237
216238 with open (file_name , "w" ) as f :
217- json .dump ({"nodes" : nodes_store_lst , "edges" : edges_lst }, f )
239+ json .dump ({NODES_LABEL : nodes_store_lst , EDGES_LABEL : edges_lst }, f )
0 commit comments