Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from inspect import isfunction
from importlib import import_module
from typing import Optional
from typing import Any, Optional

import numpy as np
from pyiron_workflow import function_node, Workflow
from pyiron_workflow.api import Function

from python_workflow_definition.models import PythonWorkflowDefinitionWorkflow
from python_workflow_definition.shared import (
Expand Down Expand Up @@ -312,3 +313,62 @@ def load_workflow_json(file_name: str, workflow: Optional[Workflow] = None):
total_dict=total_dict,
node_conversion_dict=node_conversion_dict,
)


def import_from_string(library_path: str) -> Any:
# Copied from bagofholding
split_path = library_path.split(".", 1)
if len(split_path) == 1:
module_name, path = split_path[0], ""
else:
module_name, path = split_path
obj = import_module(module_name)
for k in path.split("."):
obj = getattr(obj, k)
return obj


def build_function_dag_workflow(file_name: str) -> Workflow:
content = remove_result(
PythonWorkflowDefinitionWorkflow.load_json_file(file_name="workflow.json")
)

input_values: dict[int, object] = (
{}
) # Type is actually more restrictive, must be jsonifyable object
nodes: dict[int, Function] = {}
wf = Workflow(file_name.split(".")[0])
for node_dict in content[NODES_LABEL]:
if node_dict["type"] == "function":
fnc = import_from_string(node_dict["value"])
n = function_node(
fnc,
output_labels=fnc.__name__ # Strictly force single-output
)
nodes[node_dict["id"]] = n
wf.add_child(n)
elif node_dict["type"] == "input":
input_values[node_dict["id"]] = node_dict["value"]

for edge_dict in content[EDGES_LABEL]:
target_id = edge_dict["target"]
target_port = edge_dict["targetPort"]
source_id = edge_dict["source"]
source_port = edge_dict["sourcePort"]

if source_port is None:
if source_id in input_values.keys(): # Parent input value
upstream = input_values[source_id]
else: # Single-output sibling
upstream = nodes[source_id]
else: # Dictionary-output sibling
injected_attribute_access = nodes[source_id].__getitem__(source_port)
upstream = injected_attribute_access
downstream = nodes[target_id]
setattr(
downstream.inputs, target_port, upstream
) # Exploit input panel magic
# Warning: edge setup routine is bespoke for an environment where all nodes return
# a single value or a dictionary

return wf