Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ jobs:
papermill pyiron_workflow.ipynb pyiron_workflow_out.ipynb -k "python3"
papermill universal_workflow.ipynb universal_workflow_out.ipynb -k "python3"

nested:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Conda config
run: echo -e "channels:\n - conda-forge\n" > .condarc
- uses: conda-incubator/setup-miniconda@v3
with:
python-version: "3.12"
miniforge-version: latest
condarc-file: .condarc
environment-file: binder/environment.yml
- name: Installation and setup
shell: bash -l {0}
run: |
pip install --no-deps --no-build-isolation -e .
conda install -c conda-forge jupyter papermill
verdi presto --profile-name pwd
- name: Tests
shell: bash -l {0}
run: |
cd example_workflows/nested
papermill aiida.ipynb aiida_out.ipynb -k "python3"

documentation:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,4 @@ aiida_to_jobflow_qe.json
pyiron_base_to_aiida_simple.json
pyiron_base_to_jobflow_qe.json
**/*.h5
**/html/
707 changes: 707 additions & 0 deletions example_workflows/nested/aiida.ipynb

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions example_workflows/nested/load_aiida.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from python_workflow_definition.aiida import load_workflow_json

from aiida import load_profile

load_profile()

workflow_json_filename = "main.pwd.json"

wg = load_workflow_json(workflow_json_filename)

wg.to_html()
wg.run()
21 changes: 21 additions & 0 deletions example_workflows/nested/main.pwd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "value": 3, "type": "input", "name": "a" },
{ "id": 1, "value": 2, "type": "input", "name": "b" },
{ "id": 2, "value": 4, "type": "input", "name": "c" },
{ "id": 3, "type": "function", "value": "workflow.get_prod_and_div" },
{ "id": 4, "type": "workflow", "value": "prod_div.json" },
{ "id": 5, "type": "function", "value": "workflow.get_sum" },
{ "id": 6, "type": "output", "name": "final_result" }
],
"edges": [
{ "target": 3, "targetPort": "x", "source": 0, "sourcePort": null },
{ "target": 3, "targetPort": "y", "source": 2, "sourcePort": null },
{ "target": 4, "targetPort": "x", "source": 3, "sourcePort": "prod" },
{ "target": 4, "targetPort": "y", "source": 3, "sourcePort": "div" },
{ "target": 5, "targetPort": "x", "source": 4, "sourcePort": "result" },
{ "target": 5, "targetPort": "y", "source": 1, "sourcePort": null },
{ "target": 6, "targetPort": null, "source": 5, "sourcePort": null }
]
}
19 changes: 19 additions & 0 deletions example_workflows/nested/prod_div.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "0.1.1",
"nodes": [
{ "id": 0, "type": "function", "value": "workflow.get_prod_and_div" },
{ "id": 1, "type": "function", "value": "workflow.get_sum" },
{ "id": 2, "type": "function", "value": "workflow.get_square" },
{ "id": 3, "type": "input", "value": 1, "name": "x" },
{ "id": 4, "type": "input", "value": 2, "name": "y" },
{ "id": 5, "type": "output", "name": "result" }
],
"edges": [
{ "target": 0, "targetPort": "x", "source": 3, "sourcePort": null },
{ "target": 0, "targetPort": "y", "source": 4, "sourcePort": null },
{ "target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod" },
{ "target": 1, "targetPort": "y", "source": 0, "sourcePort": "div" },
{ "target": 2, "targetPort": "x", "source": 1, "sourcePort": null },
{ "target": 5, "targetPort": null, "source": 2, "sourcePort": null }
]
}
149 changes: 149 additions & 0 deletions example_workflows/nested/round_trip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Round-trip test for nested workflows.

This script demonstrates that:
1. Loading a nested workflow JSON preserves all structure and values
2. Exporting a loaded workflow produces identical JSON
3. Multiple round-trips are stable (load -> export -> load -> export produces identical results)
"""

import json
from pathlib import Path
from python_workflow_definition.aiida import load_workflow_json, write_workflow_json
from aiida import load_profile

# Load AiiDA profile
load_profile()


def compare_json_files(file1: str, file2: str) -> bool:
"""Compare two JSON files for structural equality."""
with open(file1) as f1, open(file2) as f2:
data1 = json.load(f1)
data2 = json.load(f2)
# Compare as sorted JSON strings to ignore ordering
return json.dumps(data1, sort_keys=True) == json.dumps(data2, sort_keys=True)


def print_workflow_info(wg, name: str):
"""Print information about a loaded workflow."""
print(f"\n{name}:")

# Count tasks (excluding internal graph tasks)
task_count = len([t for t in wg.tasks if t.name not in ["graph_inputs", "graph_outputs", "graph_ctx"]])
print(f" Tasks: {task_count}")

# Show inputs
if hasattr(wg.inputs, '_sockets'):
print(" Inputs:")
for name, socket in wg.inputs._sockets.items():
if not name.startswith('_') and name != 'metadata':
if hasattr(socket, 'value') and socket.value is not None:
value = socket.value.value if hasattr(socket.value, 'value') else socket.value
print(f" {name} = {value}")

# Show outputs
if hasattr(wg.outputs, '_sockets'):
output_names = [name for name in wg.outputs._sockets.keys()
if not name.startswith('_') and name != 'metadata']
if output_names:
print(f" Outputs: {', '.join(output_names)}")

# Check for nested workflows
nested_count = 0
for task in wg.tasks:
if hasattr(task, 'tasks'):
nested_tasks = [t for t in task.tasks if t.name not in ['graph_inputs', 'graph_outputs', 'graph_ctx']]
if len(nested_tasks) > 0:
nested_count += 1
print(f" Nested workflow '{task.name}' with {len(nested_tasks)} tasks")
# Show nested workflow defaults
for subtask in task.tasks:
if subtask.name == 'graph_inputs' and hasattr(subtask, 'outputs'):
print(" Default inputs:")
for out in subtask.outputs:
if hasattr(out, '_name') and not out._name.startswith('_'):
value = out.value.value if hasattr(out.value, 'value') else out.value
print(f" {out._name} = {value}")


def main():
print("=" * 70)
print("NESTED WORKFLOW ROUND-TRIP TEST")
print("=" * 70)

# Define file paths
original_file = "main.pwd.json"
roundtrip1_file = "roundtrip1.pwd.json"
roundtrip2_file = "roundtrip2.pwd.json"
nested_original = "prod_div.json"
nested_export = "nested_1.json"

# Test 1: Load original workflow
print("\n[1] Loading original workflow...")
wg_original = load_workflow_json(original_file)
print_workflow_info(wg_original, "Original workflow")

# Test 2: Export to roundtrip1
print("\n[2] Exporting to roundtrip1.pwd.json...")
write_workflow_json(wg_original, roundtrip1_file)
print(f" Exported main workflow to {roundtrip1_file}")
if Path(nested_export).exists():
print(f" Exported nested workflow to {nested_export}")

# Test 3: Load roundtrip1
print("\n[3] Loading roundtrip1.pwd.json...")
wg_roundtrip1 = load_workflow_json(roundtrip1_file)
print_workflow_info(wg_roundtrip1, "Roundtrip 1 workflow")

# Test 4: Export to roundtrip2
print("\n[4] Exporting to roundtrip2.pwd.json...")
write_workflow_json(wg_roundtrip1, roundtrip2_file)
print(f" Exported to {roundtrip2_file}")

# Test 5: Compare files
print("\n[5] Comparing JSON files...")
print("-" * 70)

# Compare main workflows
main_match = compare_json_files(roundtrip1_file, roundtrip2_file)
print(f" roundtrip1 == roundtrip2: {'PASS' if main_match else 'FAIL'}")

# Compare nested workflows
if Path(nested_original).exists() and Path(nested_export).exists():
nested_match = compare_json_files(nested_original, nested_export)
print(f" {nested_original} == {nested_export}: {'PASS' if nested_match else 'FAIL'}")
else:
nested_match = True # If files don't exist, consider it a pass

# Test 6: Load roundtrip2 and verify
print("\n[6] Loading roundtrip2.pwd.json for verification...")
wg_roundtrip2 = load_workflow_json(roundtrip2_file)
print_workflow_info(wg_roundtrip2, "Roundtrip 2 workflow")

# Final verdict
print("\n" + "=" * 70)
if main_match and nested_match:
print("RESULT: ALL TESTS PASSED")
print(" - Workflow structure preserved")
print(" - Input/output values preserved")
print(" - Nested workflow defaults preserved")
print(" - Round-trip is stable and idempotent")
result = 0
else:
print("RESULT: SOME TESTS FAILED")
result = 1
print("=" * 70)

# Cleanup
print("\nCleaning up temporary files...")
for temp_file in [roundtrip1_file, roundtrip2_file, nested_export]:
if Path(temp_file).exists():
Path(temp_file).unlink()
print(f" Removed {temp_file}")

return result


if __name__ == "__main__":
exit(main())
10 changes: 10 additions & 0 deletions example_workflows/nested/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
def get_prod_and_div(x, y):
return {"prod": x * y, "div": x / y}


def get_sum(x, y):
return x + y


def get_square(x):
return x ** 2
83 changes: 83 additions & 0 deletions example_workflows/nested/write_aiida.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from aiida_workgraph import task, WorkGraph, namespace
from aiida import load_profile, orm
from python_workflow_definition.aiida import write_workflow_json
from workflow import get_prod_and_div as _get_prod_and_div, get_sum as _get_sum, get_square as _get_square

load_profile()


# Wrap the functions with @task decorator
get_prod_and_div = task(outputs=["prod", "div"])(_get_prod_and_div)
get_sum = task(_get_sum)
get_square = task(_get_square)


# Create nested workflow manually (corresponds to prod_div.json)
nested_wg = WorkGraph(
name="nested_workflow",
inputs=namespace(x=namespace, y=namespace),
outputs=namespace(result=namespace),
)

# Add tasks to nested workflow
t1 = nested_wg.add_task(get_prod_and_div)
t2 = nested_wg.add_task(get_sum)
t3 = nested_wg.add_task(get_square)

# Connect nested workflow inputs to first task
nested_wg.add_link(nested_wg.inputs.x, t1.inputs.x)
nested_wg.add_link(nested_wg.inputs.y, t1.inputs.y)

# Connect tasks within nested workflow
nested_wg.add_link(t1.outputs.prod, t2.inputs.x)
nested_wg.add_link(t1.outputs.div, t2.inputs.y)
nested_wg.add_link(t2.outputs.result, t3.inputs.x)

# Connect nested workflow output
nested_wg.outputs.result = t3.outputs.result

# Set default values for nested workflow inputs
nested_wg.inputs.x.value = orm.Float(1)
nested_wg.inputs.y.value = orm.Float(2)


# Create main workflow (corresponds to main.pwd.json)
main_wg = WorkGraph(
name="main_workflow",
inputs=namespace(a=namespace, b=namespace, c=namespace),
outputs=namespace(final_result=namespace),
)

# Add tasks to main workflow
preprocessing = main_wg.add_task(get_prod_and_div)
nested_task = main_wg.add_task(nested_wg) # Add the nested workflow as a task
postprocessing = main_wg.add_task(get_sum)

# Connect main workflow inputs to preprocessing
main_wg.add_link(main_wg.inputs.a, preprocessing.inputs.x)
main_wg.add_link(main_wg.inputs.c, preprocessing.inputs.y)

# Connect preprocessing to nested workflow
main_wg.add_link(preprocessing.outputs.prod, nested_task.inputs.x)
main_wg.add_link(preprocessing.outputs.div, nested_task.inputs.y)

# Connect nested workflow to postprocessing
main_wg.add_link(nested_task.outputs.result, postprocessing.inputs.x)
main_wg.add_link(main_wg.inputs.b, postprocessing.inputs.y)

# Connect main workflow output
main_wg.outputs.final_result = postprocessing.outputs.result

# Set default values for main workflow inputs
main_wg.inputs.a.value = orm.Float(3)
main_wg.inputs.b.value = orm.Float(2)
main_wg.inputs.c.value = orm.Float(4)


# Export to JSON (will create main_generated.pwd.json and nested_1.json)
print("Exporting workflow to JSON files...")
write_workflow_json(wg=main_wg, file_name="main_generated.pwd.json")
print("✓ Exported to main_generated.pwd.json and nested_1.json")

# Optionally run the workflow
# main_wg.run()
Loading
Loading