Skip to content

Commit

Permalink
feat(dask): detect and create rucio resources for Dask cluster (#634)
Browse files Browse the repository at this point in the history
We detect if rucio is necessary for Dask cluster by checking top
level rucio statement under resources field in reana.yaml.

Closes reanahub/reana#873
  • Loading branch information
Alputer committed Feb 11, 2025
1 parent 56f702e commit f63f45a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
7 changes: 4 additions & 3 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
user_id,
num_of_workers,
single_worker_memory,
rucio=False,
):
"""Instantiate Dask resource manager.
Expand Down Expand Up @@ -93,6 +94,8 @@ def __init__(
)
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

self.rucio = rucio

if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = get_dask_component_name(workflow_id, "autoscaler")
self.autoscaler_body = self._load_dask_autoscaler_template()
Expand Down Expand Up @@ -212,16 +215,14 @@ def _prepare_cluster(self):
self.secrets_store.get_file_secrets_volume_as_k8s_specs()
)

# FIXME: Decide how to detect if krb5, rucio and voms_proxy are needed
kerberos = False
rucio = False
voms_proxy = False

if kerberos:
self._add_krb5_containers()
if voms_proxy:
self._add_voms_proxy_init_container()
if rucio:
if self.rucio:
self._add_rucio_init_container()

def _prepare_autoscaler(self):
Expand Down
11 changes: 10 additions & 1 deletion reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is part of REANA.
# Copyright (C) 2019, 2020, 2021, 2022, 2023, 2024 CERN.
# Copyright (C) 2019, 2020, 2021, 2022, 2023, 2024, 2025 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand Down Expand Up @@ -353,6 +353,14 @@ def requires_kerberos(self) -> bool:
.get("kerberos", False)
)

def requires_rucio(self) -> bool:
"""Check whether Rucio is necessary to run the workflow engine."""
return (

Check warning on line 358 in reana_workflow_controller/workflow_run_manager.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/workflow_run_manager.py#L358

Added line #L358 was not covered by tests
self.workflow.reana_specification["workflow"]
.get("resources", {})
.get("rucio", False)
)


class KubernetesWorkflowRunManager(WorkflowRunManager):
"""Implementation of WorkflowRunManager for Kubernetes."""
Expand Down Expand Up @@ -399,6 +407,7 @@ def start_batch_workflow_run(
"single_worker_memory",
REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY,
),
rucio=self.requires_rucio(),
).create_dask_resources()

current_k8s_batchv1_api_client.create_namespaced_job(
Expand Down

0 comments on commit f63f45a

Please sign in to comment.