Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/CSET/cset_workflow/app/parbake_recipes/bin/parbake.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@

import json
import os
import subprocess
from base64 import b64decode
from pathlib import Path

from CSET.recipes import load_recipes


def parbake_all(variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool):
def parbake_all(
variables: dict, rose_datac: Path, share_dir: Path, aggregation: bool
) -> int:
"""Generate and parbake recipes from configuration."""
# Gather all recipes into a big list.
recipes = list(load_recipes(variables))
# Check we have some recipes enabled.
if not recipes:
raise ValueError("At least one recipe should be enabled.")
# Parbake all recipes remaining after filtering aggregation recipes.
recipe_count = 0
for recipe in filter(lambda r: r.aggregation == aggregation, recipes):
print(f"Parbaking {recipe}", flush=True)
recipe.parbake(rose_datac, share_dir)
recipe_count += 1
return recipe_count


def main():
Expand All @@ -44,7 +50,23 @@ def main():
share_dir = Path(os.environ["CYLC_WORKFLOW_SHARE_DIR"])
aggregation = bool(os.getenv("DO_CASE_AGGREGATION"))
# Parbake recipes for cycle.
parbake_all(variables, rose_datac, share_dir, aggregation)
recipe_count = parbake_all(variables, rose_datac, share_dir, aggregation)

# If running under cylc, notify cylc of task completion.
cylc_workflow_id = os.getenv("CYLC_WORKFLOW_ID")
cylc_task_job = os.getenv("CYLC_TASK_JOB")
if cylc_workflow_id and cylc_task_job:
message_command = [
"cylc",
"message",
"--",
cylc_workflow_id,
cylc_task_job,
]
if recipe_count:
subprocess.run(message_command + ["start baking"], check=True)
else:
subprocess.run(message_command + ["skip baking"], check=True)


if __name__ == "__main__": # pragma: no cover
Expand Down
33 changes: 23 additions & 10 deletions src/CSET/cset_workflow/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,30 @@ final cycle point = {{CSET_TRIAL_END_DATE}}
# Runs for every forecast initiation time to process the data in parallel.
{% for date in CSET_CASE_DATES %}
R1/{{date}} = """
setup_complete[^] => parbake_recipes
setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete
fetch_complete & parbake_recipes => bake_recipes => cycle_complete
fetch_complete & parbake_recipes:start_baking? => bake_recipes
parbake_recipes:skip_baking? => ! bake_recipes
parbake_recipes:skip_baking? | bake_recipes => cycle_complete
"""
{% endfor %}
{% elif CSET_CYCLING_MODE == "trial" %}
# Analysis from each forecast.
{{CSET_TRIAL_CYCLE_PERIOD}} = """
setup_complete[^] => parbake_recipes
setup_complete[^] => FETCH_DATA:succeed-all => fetch_complete
fetch_complete & parbake_recipes => bake_recipes => cycle_complete
fetch_complete & parbake_recipes:start_baking? => bake_recipes
parbake_recipes:skip_baking? => ! bake_recipes
parbake_recipes:skip_baking? | bake_recipes => cycle_complete
"""
{% endif %}

# Only runs on the final cycle.
R1/$ = """
# Run aggregation recipes.
fetch_complete & parbake_aggregation_recipes => bake_aggregation_recipes => cycle_complete
setup_complete[^] => parbake_aggregation_recipes
fetch_complete & parbake_aggregation_recipes:start_baking? => bake_aggregation_recipes
parbake_aggregation_recipes:skip_baking? | bake_aggregation_recipes => cycle_complete
# Finalise website and cleanup.
cycle_complete => finish_website => send_email
cycle_complete => housekeeping
Expand Down Expand Up @@ -97,6 +105,16 @@ final cycle point = {{CSET_TRIAL_END_DATE}}
[[[environment]]]
ANALYSIS_LENGTH = {{ANALYSIS_LENGTH}}

[[PARBAKE]]
script = rose task-run -v --app-key=parbake_recipes
execution time limit = PT5M
completion = succeeded and (start_baking or skip_baking)
[[[environment]]]
ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}}
[[[outputs]]]
start_baking='start baking'
skip_baking='skip baking'

[[METPLUS]]
[[[environment]]]
{% if METPLUS_GRID_STAT|default(False) %}
Expand Down Expand Up @@ -152,17 +170,12 @@ final cycle point = {{CSET_TRIAL_END_DATE}}

[[parbake_recipes]]
# Parbake all the recipes for this cycle.
script = rose task-run -v --app-key=parbake_recipes
execution time limit = PT5M
[[[environment]]]
ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}}
inherit=PARBAKE

[[parbake_aggregation_recipes]]
# Parbake all the aggregation recipes.
script = rose task-run -v --app-key=parbake_recipes
execution time limit = PT5M
inherit=PARBAKE
[[[environment]]]
ENCODED_ROSE_SUITE_VARIABLES = {{b64json(ROSE_SUITE_VARIABLES)}}
DO_CASE_AGGREGATION = True

[[bake_recipes]]
Expand Down
34 changes: 34 additions & 0 deletions tests/workflow_utils/test_parbake_recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Tests for parbake_recipe workflow utility."""

import subprocess
from pathlib import Path

import pytest
Expand All @@ -25,14 +26,28 @@
def test_main(monkeypatch):
"""Check parbake.main() invokes parbake_all correctly."""
function_ran = False
recipes_parbaked = 0
cylc_message_ran = False
cylc_message = ""

def mock_parbake_all(variables, rose_datac, share_dir, aggregation):
nonlocal function_ran
nonlocal recipes_parbaked
function_ran = True
assert variables == {"variable": "value"}
assert rose_datac == Path("/share/cycle/20000101T0000Z")
assert share_dir == Path("/share")
assert isinstance(aggregation, bool)
return recipes_parbaked

def mock_run(cmd, **kwargs):
nonlocal cylc_message
nonlocal cylc_message_ran
cylc_message_ran = True
assert cmd[0:3] == ["cylc", "message", "--"]
assert cmd[3] == "test-workflow"
assert cmd[4] == "test-job"
assert cmd[5] == cylc_message

monkeypatch.setattr(parbake, "parbake_all", mock_parbake_all)

Expand All @@ -51,6 +66,25 @@ def mock_parbake_all(variables, rose_datac, share_dir, aggregation):
parbake.main()
assert function_ran, "Function did not run!"

# Retry with cylc environment variables set.
monkeypatch.setattr(subprocess, "run", mock_run)
monkeypatch.setenv("CYLC_WORKFLOW_ID", "test-workflow")
monkeypatch.setenv("CYLC_TASK_JOB", "test-job")

# No recipes parbaked.
function_ran = False
recipes_parbaked = 0
cylc_message = "skip baking"
parbake.main()
assert cylc_message_ran, "Cylc message function did not run!"

# Some recipes parbaked.
function_ran = False
recipes_parbaked = 3
cylc_message = "start baking"
parbake.main()
assert cylc_message_ran, "Cylc message function did not run!"


def test_parbake_all_none_enabled(tmp_working_dir, monkeypatch):
"""Error when no recipes are enabled."""
Expand Down