Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/dirac_cwl_proto/core/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ def get_lfns(input_data: dict[str, Any]) -> dict[str, list[Path]]:
val.append(Path(item.location))
files[input_name] = val
elif isinstance(input_value, File):
if not input_value.location:
if not input_value.location and not input_value.path:
raise NotImplementedError("File location is not defined.")
if not input_value.location:
continue
if input_value.location.startswith("lfn:"):
val.append(Path(input_value.location))
files[input_name] = val
Expand Down
2 changes: 1 addition & 1 deletion src/dirac_cwl_proto/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def submit_job_client(
# Validate the workflow
console.print("[blue]:information_source:[/blue] [bold]CLI:[/bold] Validating the job(s)...")
try:
task = load_document(pack(task_path))
task = load_document(pack(task_path), baseuri=".")
except FileNotFoundError as ex:
console.print(f"[red]:heavy_multiplication_x:[/red] [bold]CLI:[/bold] Failed to load the task:\n{ex}")
return typer.Exit(code=1)
Expand Down
20 changes: 15 additions & 5 deletions src/dirac_cwl_proto/job/job_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from DIRACCommon.Core.Utilities.ReturnValues import ( # type: ignore[import-untyped]
returnValueOrRaise,
)
from pydantic import PrivateAttr
from rich.text import Text
from ruamel.yaml import YAML

Expand All @@ -47,14 +46,16 @@
class JobWrapper:
"""Job Wrapper for the execution hook."""

_sandbox_store_client: SandboxStoreClient = PrivateAttr(default_factory=SandboxStoreClient)
_sandbox_store_client: SandboxStoreClient

def __init__(self) -> None:
"""Initialize the job wrapper."""
self.execution_hooks_plugin: ExecutionHooksBasePlugin | None = None
self.job_path: Path = Path()
if os.getenv("DIRAC_PROTO_LOCAL") == "1":
self._sandbox_store_client = MockSandboxStoreClient()
else:
self._sandbox_store_client = SandboxStoreClient()

def __download_input_sandbox(self, arguments: JobInputModel, job_path: Path) -> None:
"""Download the files from the sandbox store.
Expand All @@ -66,7 +67,9 @@ def __download_input_sandbox(self, arguments: JobInputModel, job_path: Path) ->
if not self.execution_hooks_plugin:
raise RuntimeError("Could not download sandboxes")
for sandbox in arguments.sandbox:
self._sandbox_store_client.downloadSandbox(sandbox, job_path)
ret = self._sandbox_store_client.downloadSandbox(sandbox, job_path)
if not ret["OK"]:
raise RuntimeError(f"Could not download sandbox {sandbox}: {ret['Message']}")

def __upload_output_sandbox(
self,
Expand Down Expand Up @@ -129,6 +132,11 @@ def __update_inputs(self, inputs: JobInputModel, updates: dict[str, Path | list[
using `download_lfns` to ensure that the CWL job inputs reference
the correct local files.
"""
for _, value in inputs.cwl.items():
files = value if isinstance(value, list) else [value]
for file in files:
if isinstance(file, File) and file.path:
file.path = Path(file.path).name
for input_name, path in updates.items():
if isinstance(path, Path):
inputs.cwl[input_name] = File(path=str(path))
Expand Down Expand Up @@ -224,12 +232,14 @@ def _post_process(

outputs = self.__parse_output_filepaths(stdout)

success = True

if self.execution_hooks_plugin:
return self.execution_hooks_plugin.post_process(self.job_path, outputs=outputs)
success = self.execution_hooks_plugin.post_process(self.job_path, outputs=outputs)

self.__upload_output_sandbox(outputs=outputs)

return True
return success

def run_job(self, job: JobModel) -> bool:
"""Execute a given CWL workflow using cwltool.
Expand Down
8 changes: 7 additions & 1 deletion src/dirac_cwl_proto/job/job_wrapper_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@

import json
import logging
import os
import sys
import tempfile

from cwl_utils.parser import load_document_by_uri
from cwl_utils.parser.cwl_v1_2_utils import load_inputfile
from DIRAC.Core.Base.Script import Script # type: ignore[import-untyped]
from ruamel.yaml import YAML

if os.getenv("DIRAC_PROTO_LOCAL") != "1":
Script.parseCommandLine()


from dirac_cwl_proto.job.job_wrapper import JobWrapper
from dirac_cwl_proto.submission_models import JobModel


def main():
"""Execute the job wrapper for a given job model."""
if len(sys.argv) != 2:
if len(Script.getPositionalArgs()) != 1:
logging.error("1 argument is required")
sys.exit(1)

Expand Down
5 changes: 1 addition & 4 deletions src/dirac_cwl_proto/job/submission_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,9 @@ async def upload_sandbox(
Upload parameter files to the sandbox store.

:param isb_file_paths: List of input sandbox file paths
:param parameter_path: Path to the parameter file
:return: Sandbox ID or None
"""
# Modify the location of the files to point to the future location on the worker node
modified_paths = [Path(p.name) for p in isb_file_paths]
return await create_sandbox(modified_paths)
return await create_sandbox(isb_file_paths)

async def submit_job(self, job_submission: JobSubmissionModel) -> bool:
"""
Expand Down
Loading