Skip to content

Commit

Permalink
validation: add compute backend validation
Browse files Browse the repository at this point in the history
  • Loading branch information
audrium committed Jan 28, 2022
1 parent 8303d60 commit f244071
Show file tree
Hide file tree
Showing 10 changed files with 447 additions and 41 deletions.
1 change: 1 addition & 0 deletions reana_client/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ def workflow_create(ctx, file, name, skip_validation, access_token): # noqa: D3
click.format_filename(file),
access_token=access_token,
skip_validation=skip_validation,
server_capabilities=True,
)
logging.info("Connecting to {0}".format(get_api_url()))
response = create_workflow(reana_specification, name, access_token)
Expand Down
2 changes: 1 addition & 1 deletion reana_client/errors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018, 2021 CERN.
# Copyright (C) 2018, 2021, 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down
55 changes: 21 additions & 34 deletions reana_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import subprocess
import sys
import traceback
from typing import NoReturn
from typing import Dict
from uuid import UUID

import click
Expand All @@ -27,7 +27,6 @@
from reana_commons.snakemake import snakemake_load
from reana_commons.yadage import yadage_load
from reana_commons.utils import get_workflow_status_change_verb
from reana_commons.validation import validate_workspace

from reana_client.config import (
reana_yaml_schema_file_path,
Expand All @@ -36,6 +35,8 @@
from reana_client.printer import display_message
from reana_client.validation.environments import validate_environment
from reana_client.validation.parameters import validate_parameters
from reana_client.validation.compute_backends import validate_compute_backends
from reana_client.validation.workspace import _validate_workspace


def workflow_uuid_or_name(ctx, param, value):
Expand Down Expand Up @@ -151,20 +152,16 @@ def _prepare_kwargs(reana_yaml):
if not skip_validation:
validate_parameters(workflow_type, reana_yaml)

if server_capabilities:
_validate_server_capabilities(reana_yaml, access_token)

if not skip_validate_environments:
display_message(
"Verifying environments in REANA specification file...",
msg_type="info",
)
validate_environment(reana_yaml, pull=pull_environment_image)

if server_capabilities:
root_path = reana_yaml.get("workspace", {}).get("root_path")
display_message(
"Verifying workspace in REANA specification file...", msg_type="info",
)
_validate_workspace(root_path, access_token)

if workflow_type == "yadage":
# We don't send the loaded Yadage workflow spec to the cluster as
# it may result in inconsistencies between what's displayed in the
Expand All @@ -187,7 +184,7 @@ def _prepare_kwargs(reana_yaml):
def _validate_reana_yaml(reana_yaml):
"""Validate REANA specification file according to jsonschema.
:param reana_yaml: Dictionary which represents REANA specifications file.
:param reana_yaml: Dictionary which represents REANA specification file.
:raises ValidationError: Given REANA spec file does not validate against
REANA specification schema.
"""
Expand Down Expand Up @@ -416,32 +413,22 @@ def run_command(cmd, display=True, return_output=False, stderr_output=False):
sys.exit(err.returncode)


def _validate_workspace(root_path: str, access_token: str) -> NoReturn:
"""Validate workspace in REANA specification file.
def _validate_server_capabilities(reana_yaml: Dict, access_token: str) -> None:
"""Validate server capabilities in REANA specification file.
:param root_path: workspace root path to be validated.
:param reana_yaml: dictionary which represents REANA specification file.
:param access_token: access token of the current user.
:raises ValidationError: Given workspace in REANA spec file does not validate against
allowed workspaces.
"""
from reana_client.api.client import info

if not root_path:
display_message(
"Workspace not found in REANA specification file. Validation skipped.",
msg_type="warning",
indented=True,
)
else:
available_workspaces = (
info(access_token).get("workspaces_available", {}).get("value")
)
try:
validate_workspace(root_path, available_workspaces)
display_message(
"Workflow workspace appears valid.", msg_type="success", indented=True,
)
except REANAValidationError as e:
display_message(e.message, msg_type="error")
sys.exit(1)
info_response = info(access_token)

display_message(
"Verifying compute backends in REANA specification file...", msg_type="info",
)
supported_backends = info_response.get("compute_backends", {}).get("value")
validate_compute_backends(reana_yaml, supported_backends)

root_path = reana_yaml.get("workspace", {}).get("root_path")
available_workspaces = info_response.get("workspaces_available", {}).get("value")
_validate_workspace(root_path, available_workspaces)
161 changes: 161 additions & 0 deletions reana_client/validation/compute_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA client compute backend validation."""

import sys
from typing import Dict, List, Optional

from reana_client.printer import display_message


def validate_compute_backends(
reana_yaml: Dict, supported_backends: Optional[List[str]]
) -> None:
"""Validate compute backends in REANA specification file according to workflow type.
:param reana_yaml: dictionary which represents REANA specification file.
:param supported_backends: a list of the supported compute backends.
"""

def build_validator(workflow: Dict) -> None:
workflow_type = workflow["type"]
if workflow_type == "serial":
workflow_steps = workflow["specification"]["steps"]
return SerialComputeBackendValidator(
workflow_steps=workflow_steps, supported_backends=supported_backends
)
if workflow_type == "yadage":
workflow_steps = workflow["specification"]["stages"]
return YadageComputeBackendValidator(
workflow_steps=workflow_steps, supported_backends=supported_backends
)
if workflow_type == "cwl":
workflow_steps = workflow.get("specification", {}).get("$graph", workflow)
return CWLComputeBackendValidator(
workflow_steps=workflow_steps, supported_backends=supported_backends
)
if workflow_type == "snakemake":
workflow_steps = workflow["specification"]["steps"]
return SnakemakeComputeBackendValidator(
workflow_steps=workflow_steps, supported_backends=supported_backends
)

workflow = reana_yaml["workflow"]
validator = build_validator(workflow)
validator.validate()
display_message(
"Workflow compute backends appear to be valid.",
msg_type="success",
indented=True,
)


class ComputeBackendValidatorBase:
"""REANA workflow compute backend validation base class."""

def __init__(
self,
workflow_steps: Optional[List[Dict]] = None,
supported_backends: Optional[List[str]] = [],
):
"""Validate compute backends in REANA workflow steps.
:param workflow_steps: list of dictionaries which represents different steps involved in workflow.
:param supported_backends: a list of the supported compute backends.
"""
self.workflow_steps = workflow_steps
self.supported_backends = supported_backends
self.messages = []

def validate(self) -> None:
"""Validate compute backends in REANA workflow."""
raise NotImplementedError

def display_error_message(self, compute_backend: str, step_name: str) -> None:
"""Display validation error message and exit."""
message = (
f'Compute backend "{compute_backend}" found in step "{step_name}" is not supported. '
f'List of supported compute backends: "{", ".join(self.supported_backends)}"'
)
display_message(message, msg_type="error", indented=True)
sys.exit(1)


class SerialComputeBackendValidator(ComputeBackendValidatorBase):
"""REANA serial workflow compute backend validation."""

def validate(self) -> None:
"""Validate compute backends in REANA serial workflow."""
for step in self.workflow_steps:
backend = step.get("compute_backend")
if backend and backend not in self.supported_backends:
self.display_error_message(backend, step.get("name"))


class YadageComputeBackendValidator(ComputeBackendValidatorBase):
"""REANA Yadage workflow compute backend validation."""

def validate(self) -> None:
"""Validate compute backends in REANA Yadage workflow."""

def parse_stages(stages: Optional[List[Dict]]) -> None:
"""Extract compute backends in Yadage workflow steps."""
for stage in stages:
if "workflow" in stage["scheduler"]:
nested_stages = stage["scheduler"]["workflow"].get("stages", {})
parse_stages(nested_stages)
else:
environment = stage["scheduler"]["step"]["environment"]
backend = next(
(
resource["compute_backend"]
for resource in environment.get("resources", [])
if "compute_backend" in resource
),
None,
)
if backend and backend not in self.supported_backends:
self.display_error_message(backend, stage["name"])

return parse_stages(self.workflow_steps)


class CWLComputeBackendValidator(ComputeBackendValidatorBase):
"""REANA CWL workflow compute backend validation."""

def validate(self) -> None:
"""Validate compute backends in REANA CWL workflow."""

def _validate_compute_backends(workflow: Dict) -> None:
"""Validate compute backends in REANA CWL workflow steps."""
steps = workflow.get("steps", [])
for step in steps:
hints = step.get("hints", [{}]).pop()
backend = hints.get("compute_backend")
if backend and backend not in self.supported_backends:
self.display_error_message(backend, step.get("id"))

workflow = self.workflow_steps
if isinstance(workflow, dict):
_validate_compute_backends(workflow)
elif isinstance(workflow, list):
for wf in workflow:
_validate_compute_backends(wf)


class SnakemakeComputeBackendValidator(ComputeBackendValidatorBase):
"""REANA Snakemake workflow compute backend validation."""

def validate(self) -> None:
"""Validate compute backends in REANA Snakemake workflow."""
for idx, step in enumerate(self.workflow_steps):
backend = step.get("compute_backend")
if backend and backend not in self.supported_backends:
step_name = step.get("name", str(idx))
self.display_error_message(backend, step_name)
4 changes: 2 additions & 2 deletions reana_client/validation/environments.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021 CERN.
# Copyright (C) 2021, 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -33,7 +33,7 @@
def validate_environment(reana_yaml, pull=False):
"""Validate environments in REANA specification file according to workflow type.
:param reana_yaml: Dictionary which represents REANA specifications file.
:param reana_yaml: Dictionary which represents REANA specification file.
:param pull: If true, attempt to pull remote environment image to perform GID/UID validation.
"""

Expand Down
42 changes: 42 additions & 0 deletions reana_client/validation/workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA client workspace validation."""

import sys
from typing import List, Optional

from reana_commons.errors import REANAValidationError
from reana_commons.validation import validate_workspace

from reana_client.printer import display_message


def _validate_workspace(
root_path: str, available_workspaces: Optional[List[str]]
) -> None:
"""Validate workspace in REANA specification file.
:param root_path: workspace root path to be validated.
:param available_workspaces: a list of the available workspaces.
:raises ValidationError: Given workspace in REANA spec file does not validate against
allowed workspaces.
"""
if root_path:
display_message(
"Verifying workspace in REANA specification file...", msg_type="info",
)
try:
validate_workspace(root_path, available_workspaces)
display_message(
"Workflow workspace appears valid.", msg_type="success", indented=True,
)
except REANAValidationError as e:
display_message(e.message, msg_type="error")
sys.exit(1)
Loading

0 comments on commit f244071

Please sign in to comment.