Skip to content

Commit

Permalink
Merge pull request #2220 from fractal-analytics-platform/2218-change-…
Browse files Browse the repository at this point in the history
…concurrency-model-for-job-execution

2218 change concurrency model for job execution
tcompa authored Jan 28, 2025
2 parents 6a2db3b + 0dc999f commit 9588fa8
Showing 12 changed files with 92 additions and 218 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@
# 2.11.0a9

* Runner:
* Make job-execution background-task function sync in v2, to make it transparent that it runs on a thread (\#2220).
* API:
* Add new (experimental) `/project/{project_id}/workflow/{workflow_id}/type-filters-flow/` endpoint (\#2208).

Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from typing import Callable


def async_wrap(func: Callable) -> Callable:
def async_wrap_v1(func: Callable) -> Callable:
"""
Wrap a synchronous callable in an async task
4 changes: 2 additions & 2 deletions fractal_server/app/runner/v1/_local/__init__.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
from typing import Optional

from ....models.v1 import Workflow
from ...async_wrap import async_wrap
from ...async_wrap_v1 import async_wrap_v1
from ...set_start_and_last_task_index import set_start_and_last_task_index
from .._common import execute_tasks
from ..common import TaskParameters
@@ -172,7 +172,7 @@ async def process_workflow(
last_task_index=last_task_index,
)

output_dataset_metadata_history = await async_wrap(_process_workflow)(
output_dataset_metadata_history = await async_wrap_v1(_process_workflow)(
workflow=workflow,
input_paths=input_paths,
output_path=output_path,
4 changes: 2 additions & 2 deletions fractal_server/app/runner/v1/_slurm/__init__.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@
from typing import Optional
from typing import Union

from ...async_wrap import async_wrap
from ...async_wrap_v1 import async_wrap_v1
from ...executors.slurm.sudo.executor import FractalSlurmExecutor
from ...set_start_and_last_task_index import set_start_and_last_task_index
from .._common import execute_tasks
@@ -145,7 +145,7 @@ async def process_workflow(
last_task_index=last_task_index,
)

output_dataset_metadata_history = await async_wrap(_process_workflow)(
output_dataset_metadata_history = await async_wrap_v1(_process_workflow)(
workflow=workflow,
input_paths=input_paths,
output_path=output_path,
4 changes: 2 additions & 2 deletions fractal_server/app/runner/v2/__init__.py
Original file line number Diff line number Diff line change
@@ -69,7 +69,7 @@ def fail_job(
return


async def submit_workflow(
def submit_workflow(
*,
workflow_id: int,
dataset_id: int,
@@ -318,7 +318,7 @@ async def submit_workflow(
db_sync = next(DB.get_sync_db())
db_sync.close()

await process_workflow(
process_workflow(
workflow=workflow,
dataset=dataset,
workflow_dir_local=WORKFLOW_DIR_LOCAL,
53 changes: 14 additions & 39 deletions fractal_server/app/runner/v2/_local/__init__.py
Original file line number Diff line number Diff line change
@@ -24,43 +24,14 @@

from ....models.v2 import DatasetV2
from ....models.v2 import WorkflowV2
from ...async_wrap import async_wrap
from ...set_start_and_last_task_index import set_start_and_last_task_index
from ..runner import execute_tasks_v2
from ._submit_setup import _local_submit_setup
from .executor import FractalThreadPoolExecutor
from fractal_server.images.models import AttributeFiltersType


def _process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
logger_name: str,
workflow_dir_local: Path,
first_task_index: int,
last_task_index: int,
job_attribute_filters: AttributeFiltersType,
) -> None:
"""
Run the workflow using a `FractalThreadPoolExecutor`.
"""
with FractalThreadPoolExecutor() as executor:
execute_tasks_v2(
wf_task_list=workflow.task_list[
first_task_index : (last_task_index + 1)
],
dataset=dataset,
executor=executor,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_local,
logger_name=logger_name,
submit_setup_call=_local_submit_setup,
job_attribute_filters=job_attribute_filters,
)


async def process_workflow(
def process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
@@ -143,12 +114,16 @@ async def process_workflow(
last_task_index=last_task_index,
)

await async_wrap(_process_workflow)(
workflow=workflow,
dataset=dataset,
logger_name=logger_name,
workflow_dir_local=workflow_dir_local,
first_task_index=first_task_index,
last_task_index=last_task_index,
job_attribute_filters=job_attribute_filters,
)
with FractalThreadPoolExecutor() as executor:
execute_tasks_v2(
wf_task_list=workflow.task_list[
first_task_index : (last_task_index + 1)
],
dataset=dataset,
executor=executor,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_local,
logger_name=logger_name,
submit_setup_call=_local_submit_setup,
job_attribute_filters=job_attribute_filters,
)
73 changes: 24 additions & 49 deletions fractal_server/app/runner/v2/_local_experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@

from ....models.v2 import DatasetV2
from ....models.v2 import WorkflowV2
from ...async_wrap import async_wrap
from ...exceptions import JobExecutionError
from ...filenames import SHUTDOWN_FILENAME
from ...set_start_and_last_task_index import set_start_and_last_task_index
@@ -14,45 +13,7 @@
from fractal_server.images.models import AttributeFiltersType


def _process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
logger_name: str,
workflow_dir_local: Path,
first_task_index: int,
last_task_index: int,
job_attribute_filters: AttributeFiltersType,
) -> None:
"""
Run the workflow using a `FractalProcessPoolExecutor`.
"""
with FractalProcessPoolExecutor(
shutdown_file=workflow_dir_local / SHUTDOWN_FILENAME
) as executor:
try:
execute_tasks_v2(
wf_task_list=workflow.task_list[
first_task_index : (last_task_index + 1)
],
dataset=dataset,
executor=executor,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_local,
logger_name=logger_name,
submit_setup_call=_local_submit_setup,
job_attribute_filters=job_attribute_filters,
)
except BrokenProcessPool as e:
raise JobExecutionError(
info=(
"Job failed with BrokenProcessPool error, likely due to "
f"an executor shutdown.\nOriginal error:\n{e.args[0]}"
)
)


async def process_workflow(
def process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
@@ -135,12 +96,26 @@ async def process_workflow(
last_task_index=last_task_index,
)

await async_wrap(_process_workflow)(
workflow=workflow,
dataset=dataset,
logger_name=logger_name,
workflow_dir_local=workflow_dir_local,
first_task_index=first_task_index,
last_task_index=last_task_index,
job_attribute_filters=job_attribute_filters,
)
with FractalProcessPoolExecutor(
shutdown_file=workflow_dir_local / SHUTDOWN_FILENAME
) as executor:
try:
execute_tasks_v2(
wf_task_list=workflow.task_list[
first_task_index : (last_task_index + 1)
],
dataset=dataset,
executor=executor,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_local,
logger_name=logger_name,
submit_setup_call=_local_submit_setup,
job_attribute_filters=job_attribute_filters,
)
except BrokenProcessPool as e:
raise JobExecutionError(
info=(
"Job failed with BrokenProcessPool error, likely due to "
f"an executor shutdown.\nOriginal error:\n{e.args[0]}"
)
)
77 changes: 20 additions & 57 deletions fractal_server/app/runner/v2/_slurm_ssh/__init__.py
Original file line number Diff line number Diff line change
@@ -18,12 +18,10 @@
"""
from pathlib import Path
from typing import Optional
from typing import Union

from .....ssh._fabric import FractalSSH
from ....models.v2 import DatasetV2
from ....models.v2 import WorkflowV2
from ...async_wrap import async_wrap
from ...exceptions import JobExecutionError
from ...executors.slurm.ssh.executor import FractalSlurmSSHExecutor
from ...set_start_and_last_task_index import set_start_and_last_task_index
@@ -35,27 +33,35 @@
logger = set_logger(__name__)


def _process_workflow(
def process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
logger_name: str,
workflow_dir_local: Path,
workflow_dir_remote: Path,
first_task_index: int,
last_task_index: int,
fractal_ssh: FractalSSH,
worker_init: Optional[Union[str, list[str]]] = None,
workflow_dir_remote: Optional[Path] = None,
first_task_index: Optional[int] = None,
last_task_index: Optional[int] = None,
logger_name: str,
job_attribute_filters: AttributeFiltersType,
fractal_ssh: FractalSSH,
worker_init: Optional[str] = None,
# Not used
user_cache_dir: Optional[str] = None,
slurm_user: Optional[str] = None,
slurm_account: Optional[str] = None,
) -> None:
"""
Run the workflow using a `FractalSlurmSSHExecutor`.
This function initialises the a FractalSlurmExecutor, setting logging,
workflow working dir and user to impersonate. It then schedules the
workflow tasks and returns the new dataset attributes
Process workflow (SLURM backend public interface)
"""

# Set values of first_task_index and last_task_index
num_tasks = len(workflow.task_list)
first_task_index, last_task_index = set_start_and_last_task_index(
num_tasks,
first_task_index=first_task_index,
last_task_index=last_task_index,
)

if isinstance(worker_init, str):
worker_init = worker_init.split("\n")

@@ -89,46 +95,3 @@ def _process_workflow(
submit_setup_call=_slurm_submit_setup,
job_attribute_filters=job_attribute_filters,
)


async def process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
workflow_dir_local: Path,
workflow_dir_remote: Optional[Path] = None,
first_task_index: Optional[int] = None,
last_task_index: Optional[int] = None,
logger_name: str,
job_attribute_filters: AttributeFiltersType,
fractal_ssh: FractalSSH,
worker_init: Optional[str] = None,
# Not used
user_cache_dir: Optional[str] = None,
slurm_user: Optional[str] = None,
slurm_account: Optional[str] = None,
) -> None:
"""
Process workflow (SLURM backend public interface)
"""

# Set values of first_task_index and last_task_index
num_tasks = len(workflow.task_list)
first_task_index, last_task_index = set_start_and_last_task_index(
num_tasks,
first_task_index=first_task_index,
last_task_index=last_task_index,
)

await async_wrap(_process_workflow)(
workflow=workflow,
dataset=dataset,
logger_name=logger_name,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_remote,
first_task_index=first_task_index,
last_task_index=last_task_index,
worker_init=worker_init,
fractal_ssh=fractal_ssh,
job_attribute_filters=job_attribute_filters,
)
76 changes: 18 additions & 58 deletions fractal_server/app/runner/v2/_slurm_sudo/__init__.py
Original file line number Diff line number Diff line change
@@ -18,41 +18,44 @@
"""
from pathlib import Path
from typing import Optional
from typing import Union

from ....models.v2 import DatasetV2
from ....models.v2 import WorkflowV2
from ...async_wrap import async_wrap
from ...executors.slurm.sudo.executor import FractalSlurmExecutor
from ...set_start_and_last_task_index import set_start_and_last_task_index
from ..runner import execute_tasks_v2
from ._submit_setup import _slurm_submit_setup
from fractal_server.images.models import AttributeFiltersType


def _process_workflow(
def process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
logger_name: str,
workflow_dir_local: Path,
workflow_dir_remote: Path,
first_task_index: int,
last_task_index: int,
workflow_dir_remote: Optional[Path] = None,
first_task_index: Optional[int] = None,
last_task_index: Optional[int] = None,
logger_name: str,
job_attribute_filters: AttributeFiltersType,
# Slurm-specific
user_cache_dir: Optional[str] = None,
slurm_user: Optional[str] = None,
slurm_account: Optional[str] = None,
user_cache_dir: str,
worker_init: Optional[Union[str, list[str]]] = None,
job_attribute_filters: AttributeFiltersType,
worker_init: Optional[str] = None,
) -> None:
"""
Run the workflow using a `FractalSlurmExecutor`.
This function initialises the a FractalSlurmExecutor, setting logging,
workflow working dir and user to impersonate. It then schedules the
workflow tasks and returns the new dataset attributes
Process workflow (SLURM backend public interface).
"""

# Set values of first_task_index and last_task_index
num_tasks = len(workflow.task_list)
first_task_index, last_task_index = set_start_and_last_task_index(
num_tasks,
first_task_index=first_task_index,
last_task_index=last_task_index,
)

if not slurm_user:
raise RuntimeError(
"slurm_user argument is required, for slurm backend"
@@ -83,46 +86,3 @@ def _process_workflow(
submit_setup_call=_slurm_submit_setup,
job_attribute_filters=job_attribute_filters,
)


async def process_workflow(
*,
workflow: WorkflowV2,
dataset: DatasetV2,
workflow_dir_local: Path,
workflow_dir_remote: Optional[Path] = None,
first_task_index: Optional[int] = None,
last_task_index: Optional[int] = None,
logger_name: str,
job_attribute_filters: AttributeFiltersType,
# Slurm-specific
user_cache_dir: Optional[str] = None,
slurm_user: Optional[str] = None,
slurm_account: Optional[str] = None,
worker_init: Optional[str] = None,
) -> None:
"""
Process workflow (SLURM backend public interface).
"""

# Set values of first_task_index and last_task_index
num_tasks = len(workflow.task_list)
first_task_index, last_task_index = set_start_and_last_task_index(
num_tasks,
first_task_index=first_task_index,
last_task_index=last_task_index,
)
await async_wrap(_process_workflow)(
workflow=workflow,
dataset=dataset,
logger_name=logger_name,
workflow_dir_local=workflow_dir_local,
workflow_dir_remote=workflow_dir_remote,
first_task_index=first_task_index,
last_task_index=last_task_index,
user_cache_dir=user_cache_dir,
slurm_user=slurm_user,
slurm_account=slurm_account,
worker_init=worker_init,
job_attribute_filters=job_attribute_filters,
)
4 changes: 2 additions & 2 deletions tests/v2/04_runner/test_background_task.py
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ async def test_submit_workflow_failure(
)
db.expunge_all()

await submit_workflow(
submit_workflow(
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=job.id,
@@ -87,7 +87,7 @@ async def test_mkdir_error(
status="submitted",
)

await submit_workflow(
submit_workflow(
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=job.id,
6 changes: 3 additions & 3 deletions tests/v2/04_runner/test_unit_submit_workflow.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ async def test_fail_submit_workflows_wrong_IDs(
)
dataset = await dataset_factory_v2(project_id=project.id)

await submit_workflow(
submit_workflow(
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=9999999,
@@ -40,7 +40,7 @@ async def test_fail_submit_workflows_wrong_IDs(
working_dir=tmp_path.as_posix(),
)
assert job.status == JobStatusTypeV2.SUBMITTED
await submit_workflow(
submit_workflow(
workflow_id=9999999,
dataset_id=9999999,
job_id=job.id,
@@ -79,7 +79,7 @@ async def test_fail_submit_workflows_wrong_backend(
working_dir=tmp_path.as_posix(),
)

await submit_workflow(
submit_workflow(
workflow_id=workflow.id,
dataset_id=dataset.id,
job_id=job.id,
5 changes: 2 additions & 3 deletions tests/v2/09_backends/test_local_experimental.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@
)
from fractal_server.app.runner.exceptions import JobExecutionError
from fractal_server.app.runner.filenames import SHUTDOWN_FILENAME
from fractal_server.app.runner.v2._local_experimental import _process_workflow
from fractal_server.app.runner.v2._local_experimental import process_workflow
from fractal_server.app.runner.v2._local_experimental._local_config import (
get_local_backend_config,
@@ -53,7 +52,7 @@ def two_args(a, b):

async def test_unit_process_workflow():
with pytest.raises(NotImplementedError):
await process_workflow(
process_workflow(
workflow=None,
dataset=None,
logger_name=None,
@@ -212,7 +211,7 @@ async def test_indirect_shutdown_during_process_workflow(
stdout=tmp_stdout,
stderr=tmp_stderr,
)
_process_workflow(
process_workflow(
workflow=workflow,
dataset=dataset,
logger_name="logger",

0 comments on commit 9588fa8

Please sign in to comment.